mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-23 14:58:44 +00:00
commit
8b47dba05d
@ -762,6 +762,116 @@ func TestTapRegexMasking(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTapIgnoredUserAgents(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("ignored acceptance test")
|
||||
}
|
||||
|
||||
cliPath, cliPathErr := getCliPath()
|
||||
if cliPathErr != nil {
|
||||
t.Errorf("failed to get cli path, err: %v", cliPathErr)
|
||||
return
|
||||
}
|
||||
|
||||
tapCmdArgs := getDefaultTapCommandArgs()
|
||||
|
||||
tapNamespace := getDefaultTapNamespace()
|
||||
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
|
||||
|
||||
ignoredUserAgentValue := "ignore"
|
||||
tapCmdArgs = append(tapCmdArgs, "--set", fmt.Sprintf("tap.ignored-user-agents=%v", ignoredUserAgentValue))
|
||||
|
||||
tapCmd := exec.Command(cliPath, tapCmdArgs...)
|
||||
t.Logf("running command: %v", tapCmd.String())
|
||||
|
||||
t.Cleanup(func() {
|
||||
if err := cleanupCommand(tapCmd); err != nil {
|
||||
t.Logf("failed to cleanup tap command, err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
if err := tapCmd.Start(); err != nil {
|
||||
t.Errorf("failed to start tap command, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
apiServerUrl := getApiServerUrl(defaultApiServerPort)
|
||||
|
||||
if err := waitTapPodsReady(apiServerUrl); err != nil {
|
||||
t.Errorf("failed to start tap pods on time, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
proxyUrl := getProxyUrl(defaultNamespaceName, defaultServiceName)
|
||||
|
||||
ignoredUserAgentCustomHeader := "Ignored-User-Agent"
|
||||
headers := map[string]string {"User-Agent": ignoredUserAgentValue, ignoredUserAgentCustomHeader: ""}
|
||||
for i := 0; i < defaultEntriesCount; i++ {
|
||||
if _, requestErr := executeHttpGetRequestWithHeaders(fmt.Sprintf("%v/get", proxyUrl), headers); requestErr != nil {
|
||||
t.Errorf("failed to send proxy request, err: %v", requestErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < defaultEntriesCount; i++ {
|
||||
if _, requestErr := executeHttpGetRequest(fmt.Sprintf("%v/get", proxyUrl)); requestErr != nil {
|
||||
t.Errorf("failed to send proxy request, err: %v", requestErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ignoredUserAgentsCheckFunc := func() error {
|
||||
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
|
||||
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt×tamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp)
|
||||
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
|
||||
if requestErr != nil {
|
||||
return fmt.Errorf("failed to get entries, err: %v", requestErr)
|
||||
}
|
||||
|
||||
entries := requestResult.([]interface{})
|
||||
if len(entries) == 0 {
|
||||
return fmt.Errorf("unexpected entries result - Expected more than 0 entries")
|
||||
}
|
||||
|
||||
for _, entryInterface := range entries {
|
||||
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"])
|
||||
requestResult, requestErr = executeHttpGetRequest(entryUrl)
|
||||
if requestErr != nil {
|
||||
return fmt.Errorf("failed to get entry, err: %v", requestErr)
|
||||
}
|
||||
|
||||
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
|
||||
entryJson := data["entry"].(string)
|
||||
|
||||
var entry map[string]interface{}
|
||||
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
|
||||
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
|
||||
}
|
||||
|
||||
entryRequest := entry["request"].(map[string]interface{})
|
||||
entryPayload := entryRequest["payload"].(map[string]interface{})
|
||||
entryDetails := entryPayload["details"].(map[string]interface{})
|
||||
|
||||
entryHeaders := entryDetails["headers"].([]interface{})
|
||||
for _, headerInterface := range entryHeaders {
|
||||
header := headerInterface.(map[string]interface{})
|
||||
if header["name"].(string) != ignoredUserAgentCustomHeader {
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("unexpected result - user agent is not ignored")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err := retriesExecute(shortRetriesCount, ignoredUserAgentsCheckFunc); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestTapDumpLogs(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("ignored acceptance test")
|
||||
|
@ -172,6 +172,21 @@ func executeHttpRequest(response *http.Response, requestErr error) (interface{},
|
||||
return jsonBytesToInterface(data)
|
||||
}
|
||||
|
||||
func executeHttpGetRequestWithHeaders(url string, headers map[string]string) (interface{}, error) {
|
||||
request, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for headerKey, headerValue := range headers {
|
||||
request.Header.Add(headerKey, headerValue)
|
||||
}
|
||||
|
||||
client := &http.Client{}
|
||||
response, requestErr := client.Do(request)
|
||||
return executeHttpRequest(response, requestErr)
|
||||
}
|
||||
|
||||
func executeHttpGetRequest(url string) (interface{}, error) {
|
||||
response, requestErr := http.Get(url)
|
||||
return executeHttpRequest(response, requestErr)
|
||||
|
@ -4,6 +4,13 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/api"
|
||||
@ -18,15 +25,6 @@ import (
|
||||
"path/filepath"
|
||||
"plugin"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
|
||||
@ -59,7 +57,7 @@ func main() {
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
|
||||
|
||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
|
||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
||||
|
||||
hostApi(nil)
|
||||
@ -77,7 +75,7 @@ func main() {
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
|
||||
socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
|
||||
socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
||||
}
|
||||
@ -90,7 +88,7 @@ func main() {
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
|
||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
||||
|
||||
hostApi(outputItemsChannel)
|
||||
@ -98,7 +96,7 @@ func main() {
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
|
||||
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
|
||||
go filterItems(outputItemsChannel, filteredHarChannel)
|
||||
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
|
||||
hostApi(nil)
|
||||
}
|
||||
@ -230,7 +228,7 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
|
||||
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
|
||||
if filteringOptionsJson == "" {
|
||||
return &tapApi.TrafficFilteringOptions{
|
||||
HealthChecksUserAgentHeaders: []string{},
|
||||
IgnoredUserAgents: []string{},
|
||||
}
|
||||
}
|
||||
var filteringOptions tapApi.TrafficFilteringOptions
|
||||
@ -242,42 +240,16 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
|
||||
return &filteringOptions
|
||||
}
|
||||
|
||||
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
|
||||
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
|
||||
for message := range inChannel {
|
||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||
continue
|
||||
}
|
||||
// TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
|
||||
if isHealthCheckByUserAgent(message, filterOptions.HealthChecksUserAgentHeaders) {
|
||||
continue
|
||||
}
|
||||
|
||||
outChannel <- message
|
||||
}
|
||||
}
|
||||
|
||||
func isHealthCheckByUserAgent(item *tapApi.OutputChannelItem, userAgentsToIgnore []string) bool {
|
||||
if item.Protocol.Name != "http" {
|
||||
return false
|
||||
}
|
||||
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
|
||||
for _, header := range reqDetails["headers"].([]interface{}) {
|
||||
h := header.(map[string]interface{})
|
||||
if strings.ToLower(h["name"].(string)) == "user-agent" {
|
||||
for _, userAgent := range userAgentsToIgnore {
|
||||
if strings.Contains(strings.ToLower(h["value"].(string)), strings.ToLower(userAgent)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
|
||||
if connection == nil {
|
||||
panic("Websocket connection is nil")
|
||||
|
@ -113,9 +113,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
json.Unmarshal([]byte(mizuEntry.Entry), &pair)
|
||||
harEntry, err := utils.NewEntry(&pair)
|
||||
if err == nil {
|
||||
rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
|
||||
rules, _, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
|
||||
baseEntry.Rules = rules
|
||||
baseEntry.Latency = mizuEntry.ElapsedTime
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,11 +143,13 @@ func GetEntry(c *gin.Context) {
|
||||
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
|
||||
|
||||
var rules []map[string]interface{}
|
||||
var isRulesEnabled bool
|
||||
if entryData.ProtocolName == "http" {
|
||||
var pair tapApi.RequestResponsePair
|
||||
json.Unmarshal([]byte(entryData.Entry), &pair)
|
||||
harEntry, _ := utils.NewEntry(&pair)
|
||||
_, rulesMatched := models.RunValidationRulesState(*harEntry, entryData.Service)
|
||||
_, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entryData.Service)
|
||||
isRulesEnabled = _isRulesEnabled
|
||||
inrec, _ := json.Marshal(rulesMatched)
|
||||
json.Unmarshal(inrec, &rules)
|
||||
}
|
||||
@ -158,6 +160,7 @@ func GetEntry(c *gin.Context) {
|
||||
BodySize: bodySize,
|
||||
Data: entryData,
|
||||
Rules: rules,
|
||||
IsRulesEnabled: isRulesEnabled,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -97,8 +97,8 @@ type ExtendedCreator struct {
|
||||
Source *string `json:"_source"`
|
||||
}
|
||||
|
||||
func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched) {
|
||||
resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
|
||||
func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched, bool) {
|
||||
resultPolicyToSend, isEnabled := rules.MatchRequestPolicy(harEntry, service)
|
||||
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend)
|
||||
return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend
|
||||
return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend, isEnabled
|
||||
}
|
||||
|
@ -4,11 +4,12 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/romana/rlog"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
jsonpath "github.com/yalp/jsonpath"
|
||||
@ -43,9 +44,11 @@ func ValidateService(serviceFromRule string, service string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
|
||||
enforcePolicy, _ := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
|
||||
var resultPolicyToSend []RulesMatched
|
||||
func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend []RulesMatched, isEnabled bool) {
|
||||
enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
|
||||
if err == nil {
|
||||
isEnabled = true
|
||||
}
|
||||
for _, rule := range enforcePolicy.Rules {
|
||||
if !ValidatePath(rule.Path, harEntry.Request.URL) || !ValidateService(rule.Service, service) {
|
||||
continue
|
||||
@ -93,12 +96,12 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
|
||||
resultPolicyToSend = appendRulesMatched(resultPolicyToSend, true, rule)
|
||||
}
|
||||
}
|
||||
return resultPolicyToSend
|
||||
return
|
||||
}
|
||||
|
||||
func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
|
||||
var numberOfRulesMatched = len(rulesMatched)
|
||||
var latency int64 = -1
|
||||
var responseTime int64 = -1
|
||||
|
||||
if numberOfRulesMatched == 0 {
|
||||
return false, 0, numberOfRulesMatched
|
||||
@ -106,15 +109,15 @@ func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
|
||||
|
||||
for _, rule := range rulesMatched {
|
||||
if rule.Matched == false {
|
||||
return false, latency, numberOfRulesMatched
|
||||
return false, responseTime, numberOfRulesMatched
|
||||
} else {
|
||||
if strings.ToLower(rule.Rule.Type) == "latency" {
|
||||
if rule.Rule.Latency < latency || latency == -1 {
|
||||
latency = rule.Rule.Latency
|
||||
if strings.ToLower(rule.Rule.Type) == "responseTime" {
|
||||
if rule.Rule.ResponseTime < responseTime || responseTime == -1 {
|
||||
responseTime = rule.Rule.ResponseTime
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, latency, numberOfRulesMatched
|
||||
return true, responseTime, numberOfRulesMatched
|
||||
}
|
||||
|
@ -1,38 +0,0 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
DEFAULT_SOCKET_RETRIES = 3
|
||||
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
|
||||
)
|
||||
|
||||
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
|
||||
var err error
|
||||
var connection *websocket.Conn
|
||||
try := 0
|
||||
|
||||
// Connection to server fails if client pod is up before server.
|
||||
// Retries solve this issue.
|
||||
for try < DEFAULT_SOCKET_RETRIES {
|
||||
rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
|
||||
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
|
||||
if err != nil {
|
||||
rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
|
||||
try++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return connection, nil
|
||||
}
|
@ -32,7 +32,7 @@ var logsCmd = &cobra.Command{
|
||||
|
||||
logger.Log.Debugf("Using file path %s", config.Config.Logs.FilePath())
|
||||
|
||||
if dumpLogsErr := fsUtils.DumpLogs(kubernetesProvider, ctx, config.Config.Logs.FilePath()); dumpLogsErr != nil {
|
||||
if dumpLogsErr := fsUtils.DumpLogs(ctx, kubernetesProvider, config.Config.Logs.FilePath()); dumpLogsErr != nil {
|
||||
logger.Log.Errorf("Failed dump logs %v", dumpLogsErr)
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
@ -68,7 +67,4 @@ func init() {
|
||||
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
|
||||
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
|
||||
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFileDeprecated, defaultTapConfig.EnforcePolicyFileDeprecated, "Yaml file with policy rules")
|
||||
tapCmd.Flags().MarkDeprecated(configStructs.EnforcePolicyFileDeprecated, fmt.Sprintf("Use --%s instead", configStructs.EnforcePolicyFile))
|
||||
}
|
||||
|
@ -8,6 +8,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
core "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
@ -22,9 +26,6 @@ import (
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
yaml "gopkg.in/yaml.v3"
|
||||
core "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -36,7 +37,6 @@ type tapState struct {
|
||||
apiServerService *core.Service
|
||||
currentlyTappedPods []core.Pod
|
||||
mizuServiceAccountExists bool
|
||||
doNotRemoveConfigMap bool
|
||||
}
|
||||
|
||||
var state tapState
|
||||
@ -49,15 +49,8 @@ func RunMizuTap() {
|
||||
}
|
||||
|
||||
var mizuValidationRules string
|
||||
if config.Config.Tap.EnforcePolicyFile != "" || config.Config.Tap.EnforcePolicyFileDeprecated != "" {
|
||||
var trafficValidation string
|
||||
if config.Config.Tap.EnforcePolicyFile != "" {
|
||||
trafficValidation = config.Config.Tap.EnforcePolicyFile
|
||||
} else {
|
||||
trafficValidation = config.Config.Tap.EnforcePolicyFileDeprecated
|
||||
}
|
||||
|
||||
mizuValidationRules, err = readValidationRules(trafficValidation)
|
||||
if config.Config.Tap.EnforcePolicyFile != "" {
|
||||
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
@ -76,7 +69,7 @@ func RunMizuTap() {
|
||||
targetNamespaces := getNamespaces(kubernetesProvider)
|
||||
|
||||
if config.Config.IsNsRestrictedMode() {
|
||||
if len(targetNamespaces) != 1 || !mizu.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) {
|
||||
if len(targetNamespaces) != 1 || !shared.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) {
|
||||
logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
|
||||
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName)
|
||||
return
|
||||
@ -84,7 +77,7 @@ func RunMizuTap() {
|
||||
}
|
||||
|
||||
var namespacesStr string
|
||||
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
|
||||
if !shared.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
|
||||
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \""))
|
||||
} else {
|
||||
namespacesStr = "all namespaces"
|
||||
@ -99,7 +92,7 @@ func RunMizuTap() {
|
||||
|
||||
if len(state.currentlyTappedPods) == 0 {
|
||||
var suggestionStr string
|
||||
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
|
||||
if !shared.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
|
||||
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
|
||||
}
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
|
||||
@ -109,18 +102,17 @@ func RunMizuTap() {
|
||||
return
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
|
||||
defer finishMizuExecution(kubernetesProvider)
|
||||
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
||||
if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
|
||||
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
|
||||
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
|
||||
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions)
|
||||
|
||||
//block until exit signal or error
|
||||
// block until exit signal or error
|
||||
waitForFinish(ctx, cancel)
|
||||
}
|
||||
|
||||
@ -133,7 +125,7 @@ func readValidationRules(file string) (string, error) {
|
||||
return string(newContent), nil
|
||||
}
|
||||
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
|
||||
if !config.Config.IsNsRestrictedMode() {
|
||||
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
||||
return err
|
||||
@ -144,15 +136,8 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
return err
|
||||
}
|
||||
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
|
||||
state.doNotRemoveConfigMap = true
|
||||
} else if mizuValidationRules == "" {
|
||||
state.doNotRemoveConfigMap = true
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -224,13 +209,15 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
|
||||
}
|
||||
|
||||
return &api.TrafficFilteringOptions{
|
||||
PlainTextMaskingRegexes: compiledRegexSlice,
|
||||
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
|
||||
DisableRedaction: config.Config.Tap.DisableRedaction,
|
||||
PlainTextMaskingRegexes: compiledRegexSlice,
|
||||
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
||||
DisableRedaction: config.Config.Tap.DisableRedaction,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
|
||||
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
|
||||
if len(nodeToTappedPodIPMap) > 0 {
|
||||
var serviceAccountName string
|
||||
if state.mizuServiceAccountExists {
|
||||
@ -268,75 +255,108 @@ func finishMizuExecution(kubernetesProvider *kubernetes.Provider) {
|
||||
telemetry.ReportAPICalls()
|
||||
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
|
||||
defer cancel()
|
||||
dumpLogsIfNeeded(kubernetesProvider, removalCtx)
|
||||
cleanUpMizuResources(kubernetesProvider, removalCtx, cancel)
|
||||
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
|
||||
cleanUpMizuResources(removalCtx, cancel, kubernetesProvider)
|
||||
}
|
||||
|
||||
func dumpLogsIfNeeded(kubernetesProvider *kubernetes.Provider, removalCtx context.Context) {
|
||||
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
|
||||
if !config.Config.DumpLogs {
|
||||
return
|
||||
}
|
||||
mizuDir := mizu.GetMizuFolderPath()
|
||||
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
|
||||
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
|
||||
if err := fsUtils.DumpLogs(ctx, kubernetesProvider, filePath); err != nil {
|
||||
logger.Log.Errorf("Failed dump logs %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider, removalCtx context.Context, cancel context.CancelFunc) {
|
||||
func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
|
||||
logger.Log.Infof("\nRemoving mizu resources\n")
|
||||
|
||||
if !config.Config.IsNsRestrictedMode() {
|
||||
if err := kubernetesProvider.RemoveNamespace(removalCtx, config.Config.MizuResourcesNamespace); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
var leftoverResources []string
|
||||
|
||||
if config.Config.IsNsRestrictedMode() {
|
||||
leftoverResources = cleanUpRestrictedMode(ctx, kubernetesProvider)
|
||||
} else {
|
||||
if err := kubernetesProvider.RemovePod(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveService(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if !state.doNotRemoveConfigMap {
|
||||
if err := kubernetesProvider.RemoveConfigMap(removalCtx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
}
|
||||
|
||||
leftoverResources = cleanUpNonRestrictedMode(ctx, cancel, kubernetesProvider)
|
||||
}
|
||||
|
||||
if state.mizuServiceAccountExists {
|
||||
if !config.Config.IsNsRestrictedMode() {
|
||||
if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := kubernetesProvider.RemoveServicAccount(removalCtx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveRole(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
if len(leftoverResources) > 0 {
|
||||
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", logger.GetLogFilePath())
|
||||
for _, resource := range leftoverResources {
|
||||
errMsg += "\n- " + resource
|
||||
}
|
||||
logger.Log.Errorf(uiUtils.Error, errMsg)
|
||||
}
|
||||
}
|
||||
|
||||
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
|
||||
leftoverResources := make([]string, 0)
|
||||
|
||||
if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("Pod %s in namespace %s", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if !config.Config.IsNsRestrictedMode() {
|
||||
waitUntilNamespaceDeleted(removalCtx, cancel, kubernetesProvider)
|
||||
if err := kubernetesProvider.RemoveService(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("Service %s in namespace %s", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", mizu.ConfigMapName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveServicAccount(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("Service Account %s in namespace %s", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("Role %s in namespace %s", mizu.RoleName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", mizu.RoleBindingName, config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
return leftoverResources
|
||||
}
|
||||
|
||||
func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) []string {
|
||||
leftoverResources := make([]string, 0)
|
||||
|
||||
if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
||||
resourceDesc := fmt.Sprintf("Namespace %s", config.Config.MizuResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
} else {
|
||||
defer waitUntilNamespaceDeleted(ctx, cancel, kubernetesProvider)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveClusterRole(ctx, mizu.ClusterRoleName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("ClusterRole %s", mizu.ClusterRoleName)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, mizu.ClusterRoleBindingName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", mizu.ClusterRoleBindingName)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
return leftoverResources
|
||||
}
|
||||
|
||||
func handleDeletionError(err error, resourceDesc string, leftoverResources *[]string) {
|
||||
logger.Log.Debugf("Error removing %s: %v", resourceDesc, errormessage.FormatError(err))
|
||||
*leftoverResources = append(*leftoverResources, resourceDesc)
|
||||
}
|
||||
|
||||
func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
|
||||
@ -376,13 +396,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
|
||||
cancel()
|
||||
}
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
@ -500,7 +515,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
return missingPods
|
||||
}
|
||||
|
||||
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
|
||||
isPodReady := false
|
||||
@ -530,16 +545,38 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodPending {
|
||||
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
||||
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
|
||||
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
|
||||
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
|
||||
url := GetApiServerUrl()
|
||||
if err := apiserver.Provider.InitAndTestConnection(url); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
|
||||
cancel()
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at %s\n", url)
|
||||
openBrowser(url)
|
||||
requestForAnalysisIfNeeded()
|
||||
@ -547,13 +584,13 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
}
|
||||
}
|
||||
case _, ok := <-errorChan:
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
|
||||
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
|
||||
cancel()
|
||||
|
||||
case <-timeAfter:
|
||||
@ -568,6 +605,72 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
}
|
||||
}
|
||||
|
||||
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
|
||||
var prevPodPhase core.PodPhase
|
||||
for {
|
||||
select {
|
||||
case addedPod, ok := <-added:
|
||||
if !ok {
|
||||
added = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
|
||||
case removedPod, ok := <-removed:
|
||||
if !ok {
|
||||
removed = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
|
||||
case modifiedPod, ok := <-modified:
|
||||
if !ok {
|
||||
modified = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
||||
logger.Log.Infof(uiUtils.Red, "Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message)
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
|
||||
podStatus := modifiedPod.Status
|
||||
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
|
||||
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
||||
continue
|
||||
}
|
||||
prevPodPhase = podStatus.Phase
|
||||
|
||||
if podStatus.Phase == core.PodRunning {
|
||||
state := podStatus.ContainerStatuses[0].State
|
||||
if state.Terminated != nil {
|
||||
switch state.Terminated.Reason {
|
||||
case "OOMKilled":
|
||||
logger.Log.Infof(uiUtils.Red, "Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err)
|
||||
cancel()
|
||||
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debugf("Watching tapper pod loop, ctx done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func requestForAnalysisIfNeeded() {
|
||||
if !config.Config.Tap.Analysis {
|
||||
return
|
||||
@ -609,7 +712,7 @@ func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
|
||||
if config.Config.Tap.AllNamespaces {
|
||||
return []string{mizu.K8sAllNamespaces}
|
||||
} else if len(config.Config.Tap.Namespaces) > 0 {
|
||||
return mizu.Unique(config.Config.Tap.Namespaces)
|
||||
return shared.Unique(config.Config.Tap.Namespaces)
|
||||
} else {
|
||||
return []string{kubernetesProvider.CurrentNamespace()}
|
||||
}
|
||||
|
@ -25,4 +25,7 @@ func init() {
|
||||
defaults.Set(&defaultViewConfig)
|
||||
|
||||
viewCmd.Flags().Uint16P(configStructs.GuiPortViewName, "p", defaultViewConfig.GuiPort, "Provide a custom port for the web interface webserver")
|
||||
viewCmd.Flags().StringP(configStructs.UrlViewName, "u", defaultViewConfig.Url, "Provide a custom host")
|
||||
|
||||
viewCmd.Flags().MarkHidden(configStructs.UrlViewName)
|
||||
}
|
||||
|
@ -24,34 +24,39 @@ func runMizuView() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to found mizu service %v", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if !exists {
|
||||
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
url := config.Config.View.Url
|
||||
|
||||
url := GetApiServerUrl()
|
||||
if url == "" {
|
||||
exists, err := kubernetesProvider.DoesServicesExist(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to found mizu service %v", err)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if !exists {
|
||||
logger.Log.Infof("%s service not found, you should run `mizu tap` command first", mizu.ApiServerPodName)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
response, err := http.Get(fmt.Sprintf("%s/", url))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
|
||||
return
|
||||
}
|
||||
logger.Log.Infof("Establishing connection to k8s cluster...")
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
url = GetApiServerUrl()
|
||||
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
return
|
||||
response, err := http.Get(fmt.Sprintf("%s/", url))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
|
||||
return
|
||||
}
|
||||
logger.Log.Infof("Establishing connection to k8s cluster...")
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at %s\n", url)
|
||||
|
||||
openBrowser(url)
|
||||
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
|
||||
logger.Log.Errorf("Failed to check versions compatibility %v", err)
|
||||
|
@ -4,7 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
@ -89,7 +89,7 @@ func initFlag(f *pflag.Flag) {
|
||||
configElemValue := reflect.ValueOf(&Config).Elem()
|
||||
|
||||
var flagPath []string
|
||||
if mizu.Contains([]string{ConfigFilePathCommandName}, f.Name) {
|
||||
if shared.Contains([]string{ConfigFilePathCommandName}, f.Name) {
|
||||
flagPath = []string{f.Name}
|
||||
} else {
|
||||
flagPath = []string{cmdName, f.Name}
|
||||
|
@ -17,26 +17,24 @@ const (
|
||||
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
|
||||
DryRunTapName = "dry-run"
|
||||
EnforcePolicyFile = "traffic-validation-file"
|
||||
EnforcePolicyFileDeprecated = "test-rules"
|
||||
)
|
||||
|
||||
type TapConfig struct {
|
||||
AnalysisDestination string `yaml:"dest" default:"up9.app"`
|
||||
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
|
||||
PodRegexStr string `yaml:"regex" default:".*"`
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
Namespaces []string `yaml:"namespaces"`
|
||||
Analysis bool `yaml:"analysis" default:"false"`
|
||||
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
|
||||
PlainTextFilterRegexes []string `yaml:"regex-masking"`
|
||||
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
|
||||
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
||||
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
|
||||
DryRun bool `yaml:"dry-run" default:"false"`
|
||||
EnforcePolicyFile string `yaml:"traffic-validation-file"`
|
||||
EnforcePolicyFileDeprecated string `yaml:"test-rules"`
|
||||
ApiServerResources Resources `yaml:"api-server-resources"`
|
||||
TapperResources Resources `yaml:"tapper-resources"`
|
||||
AnalysisDestination string `yaml:"dest" default:"up9.app"`
|
||||
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
|
||||
PodRegexStr string `yaml:"regex" default:".*"`
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
Namespaces []string `yaml:"namespaces"`
|
||||
Analysis bool `yaml:"analysis" default:"false"`
|
||||
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
|
||||
PlainTextFilterRegexes []string `yaml:"regex-masking"`
|
||||
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
|
||||
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
||||
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
|
||||
DryRun bool `yaml:"dry-run" default:"false"`
|
||||
EnforcePolicyFile string `yaml:"traffic-validation-file"`
|
||||
ApiServerResources Resources `yaml:"api-server-resources"`
|
||||
TapperResources Resources `yaml:"tapper-resources"`
|
||||
}
|
||||
|
||||
type Resources struct {
|
||||
|
@ -2,8 +2,10 @@ package configStructs
|
||||
|
||||
const (
|
||||
GuiPortViewName = "gui-port"
|
||||
UrlViewName = "url"
|
||||
)
|
||||
|
||||
type ViewConfig struct {
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
Url string `yaml:"url,omitempty" readonly:""`
|
||||
}
|
||||
|
@ -268,67 +268,21 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s
|
||||
return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{})
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, serviceAccountName string) (bool, error) {
|
||||
serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, serviceAccountName, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(serviceAccount, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.CoreV1().Namespaces().Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesClusterRoleExist(ctx context.Context, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.RbacV1().ClusterRoles().Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesClusterRoleBindingExist(ctx context.Context, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.RbacV1().ClusterRoleBindings().Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesRoleExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.RbacV1().Roles(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesRoleBindingExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.RbacV1().RoleBindings(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesPodExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesDaemonSetExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
resource, err := provider.clientSet.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(resource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) doesResourceExist(resource interface{}, err error) (bool, error) {
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) {
|
||||
// expected behavior when resource does not exist
|
||||
if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound {
|
||||
return false, nil
|
||||
}
|
||||
// Getting NotFound error is the expected behavior when a resource does not exist.
|
||||
if k8serrors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return resource != nil, nil
|
||||
}
|
||||
|
||||
@ -441,115 +395,63 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context,
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveNamespace(ctx context.Context, name string) error {
|
||||
if isFound, err := provider.DoesNamespaceExist(ctx, name); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveNonNamespacedResources(ctx context.Context, clusterRoleName string, clusterRoleBindingName string) error {
|
||||
if err := provider.RemoveClusterRole(ctx, clusterRoleName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.RemoveClusterRoleBinding(ctx, clusterRoleBindingName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
err := provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveClusterRole(ctx context.Context, name string) error {
|
||||
if isFound, err := provider.DoesClusterRoleExist(ctx, name); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveClusterRoleBinding(ctx context.Context, name string) error {
|
||||
if isFound, err := provider.DoesClusterRoleBindingExist(ctx, name); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveRoleBinding(ctx context.Context, namespace string, name string) error {
|
||||
if isFound, err := provider.DoesRoleBindingExist(ctx, namespace, name); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveRole(ctx context.Context, namespace string, name string) error {
|
||||
if isFound, err := provider.DoesRoleExist(ctx, namespace, name); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveServicAccount(ctx context.Context, namespace string, name string) error {
|
||||
if isFound, err := provider.DoesServiceAccountExist(ctx, namespace, name); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
|
||||
if isFound, err := provider.DoesPodExist(ctx, namespace, podName); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveConfigMap(ctx context.Context, namespace string, configMapName string) error {
|
||||
if isFound, err := provider.DoesConfigMapExist(ctx, namespace, configMapName); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
|
||||
if isFound, err := provider.DoesServicesExist(ctx, namespace, serviceName); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
|
||||
err := provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
|
||||
if isFound, err := provider.DoesDaemonSetExist(ctx, namespace, daemonSetName); err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
err := provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
|
||||
return provider.handleRemovalError(err)
|
||||
}
|
||||
|
||||
func (provider *Provider) handleRemovalError(err error) error {
|
||||
// Ignore NotFound - There is nothing to delete.
|
||||
// Ignore Forbidden - Assume that a user could not have created the resource in the first place.
|
||||
if k8serrors.IsNotFound(err) || k8serrors.IsForbidden(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, data string) error {
|
||||
@ -731,7 +633,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
|
||||
return matchingPods, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) GetPodLogs(namespace string, podName string, ctx context.Context) (string, error) {
|
||||
func (provider *Provider) GetPodLogs(ctx context.Context, namespace string, podName string) (string, error) {
|
||||
podLogOpts := core.PodLogOptions{}
|
||||
req := provider.clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
|
||||
podLogs, err := req.Stream(ctx)
|
||||
@ -747,7 +649,7 @@ func (provider *Provider) GetPodLogs(namespace string, podName string, ctx conte
|
||||
return str, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) GetNamespaceEvents(namespace string, ctx context.Context) (string, error) {
|
||||
func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace string) (string, error) {
|
||||
eventsOpts := metav1.ListOptions{}
|
||||
eventList, err := provider.clientSet.CoreV1().Events(namespace).List(ctx, eventsOpts)
|
||||
if err != nil {
|
||||
|
@ -33,7 +33,10 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
|
||||
return
|
||||
}
|
||||
|
||||
pod := e.Object.(*corev1.Pod)
|
||||
pod, ok := e.Object.(*corev1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if !podFilter.MatchString(pod.Name) {
|
||||
continue
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
"regexp"
|
||||
)
|
||||
|
||||
func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath string) error {
|
||||
func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath string) error {
|
||||
podExactRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix)
|
||||
pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{config.Config.MizuResourcesNamespace})
|
||||
if err != nil {
|
||||
@ -32,7 +32,7 @@ func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath strin
|
||||
defer zipWriter.Close()
|
||||
|
||||
for _, pod := range pods {
|
||||
logs, err := provider.GetPodLogs(pod.Namespace, pod.Name, ctx)
|
||||
logs, err := provider.GetPodLogs(ctx, pod.Namespace, pod.Name)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to get logs, %v", err)
|
||||
continue
|
||||
@ -47,7 +47,7 @@ func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath strin
|
||||
}
|
||||
}
|
||||
|
||||
events, err := provider.GetNamespaceEvents(config.Config.MizuResourcesNamespace, ctx)
|
||||
events, err := provider.GetNamespaceEvents(ctx, config.Config.MizuResourcesNamespace)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Failed to get k8b events, %v", err)
|
||||
} else {
|
||||
|
@ -31,12 +31,12 @@ mizu tap --traffic-validation-file rules.yaml
|
||||
The structure of the traffic-validation-file is:
|
||||
|
||||
* `name`: string, name of the rule
|
||||
* `type`: string, type of the rule, must be `json` or `header` or `latency`
|
||||
* `type`: string, type of the rule, must be `json` or `header` or `slo`
|
||||
* `key`: string, [jsonpath](https://code.google.com/archive/p/jsonpath/wikis/Javascript.wiki) used only in `json` or `header` type
|
||||
* `value`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) used only in `json` or `header` type
|
||||
* `service`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) service name to filter
|
||||
* `path`: string, [regex](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions) URL path to filter
|
||||
* `latency`: integer, time in ms of the expected latency.
|
||||
* `response-time`: integer, time in ms of the expected latency.
|
||||
|
||||
|
||||
### For example:
|
||||
@ -54,8 +54,8 @@ rules:
|
||||
key: "Content-Le.*"
|
||||
value: "(\\d+(?:\\.\\d+)?)"
|
||||
- name: latency-test
|
||||
type: latency
|
||||
latency: 1
|
||||
type: slo
|
||||
response-time: 1
|
||||
service: "carts.*"
|
||||
```
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
@ -83,14 +83,14 @@ type RulesPolicy struct {
|
||||
}
|
||||
|
||||
type RulePolicy struct {
|
||||
Type string `yaml:"type"`
|
||||
Service string `yaml:"service"`
|
||||
Path string `yaml:"path"`
|
||||
Method string `yaml:"method"`
|
||||
Key string `yaml:"key"`
|
||||
Value string `yaml:"value"`
|
||||
Latency int64 `yaml:"latency"`
|
||||
Name string `yaml:"name"`
|
||||
Type string `yaml:"type"`
|
||||
Service string `yaml:"service"`
|
||||
Path string `yaml:"path"`
|
||||
Method string `yaml:"method"`
|
||||
Key string `yaml:"key"`
|
||||
Value string `yaml:"value"`
|
||||
ResponseTime int64 `yaml:"response-time"`
|
||||
Name string `yaml:"name"`
|
||||
}
|
||||
|
||||
type RulesMatched struct {
|
||||
@ -99,14 +99,17 @@ type RulesMatched struct {
|
||||
}
|
||||
|
||||
func (r *RulePolicy) validateType() bool {
|
||||
permitedTypes := []string{"json", "header", "latency"}
|
||||
permitedTypes := []string{"json", "header", "slo"}
|
||||
_, found := Find(permitedTypes, r.Type)
|
||||
if !found {
|
||||
fmt.Printf("\nRule with name %s will be ignored. Err: only json, header and latency types are supported on rule definition.\n", r.Name)
|
||||
log.Printf("Error: %s. ", r.Name)
|
||||
log.Printf("Only json, header and slo types are supported on rule definition. This rule will be ignored\n")
|
||||
found = false
|
||||
}
|
||||
if strings.ToLower(r.Type) == "latency" {
|
||||
if r.Latency == 0 {
|
||||
fmt.Printf("\nRule with name %s will be ignored. Err: when type=latency, the field Latency should be specified and have a value >= 1\n\n", r.Name)
|
||||
if strings.ToLower(r.Type) == "slo" {
|
||||
if r.ResponseTime <= 0 {
|
||||
log.Printf("Error: %s. ", r.Name)
|
||||
log.Printf("When type=slo, the field response-time should be specified and have a value >= 1\n\n")
|
||||
found = false
|
||||
}
|
||||
}
|
||||
@ -124,10 +127,6 @@ func (rules *RulesPolicy) ValidateRulesPolicy() []int {
|
||||
return invalidIndex
|
||||
}
|
||||
|
||||
func (rules *RulesPolicy) RemoveRule(idx int) {
|
||||
rules.Rules = append(rules.Rules[:idx], rules.Rules[idx+1:]...)
|
||||
}
|
||||
|
||||
func Find(slice []string, val string) (int, bool) {
|
||||
for i, item := range slice {
|
||||
if item == val {
|
||||
@ -148,10 +147,15 @@ func DecodeEnforcePolicy(path string) (RulesPolicy, error) {
|
||||
return enforcePolicy, err
|
||||
}
|
||||
invalidIndex := enforcePolicy.ValidateRulesPolicy()
|
||||
var k = 0
|
||||
if len(invalidIndex) != 0 {
|
||||
for i := range invalidIndex {
|
||||
enforcePolicy.RemoveRule(invalidIndex[i])
|
||||
for i, rule := range enforcePolicy.Rules {
|
||||
if !ContainsInt(invalidIndex, i) {
|
||||
enforcePolicy.Rules[k] = rule
|
||||
k++
|
||||
}
|
||||
}
|
||||
enforcePolicy.Rules = enforcePolicy.Rules[:k]
|
||||
}
|
||||
return enforcePolicy, nil
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package mizu
|
||||
package shared
|
||||
|
||||
func Contains(slice []string, containsValue string) bool {
|
||||
for _, sliceValue := range slice {
|
||||
@ -10,6 +10,16 @@ func Contains(slice []string, containsValue string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func ContainsInt(slice []int, containsValue int) bool {
|
||||
for _, sliceValue := range slice {
|
||||
if sliceValue == containsValue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
func Unique(slice []string) []string {
|
||||
keys := make(map[string]bool)
|
||||
var list []string
|
@ -1,8 +1,8 @@
|
||||
package mizu_test
|
||||
package shared_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
@ -21,7 +21,7 @@ func TestContainsExists(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.ContainsValue, func(t *testing.T) {
|
||||
actual := mizu.Contains(test.Slice, test.ContainsValue)
|
||||
actual := shared.Contains(test.Slice, test.ContainsValue)
|
||||
if actual != test.Expected {
|
||||
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
|
||||
}
|
||||
@ -43,7 +43,7 @@ func TestContainsNotExists(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.ContainsValue, func(t *testing.T) {
|
||||
actual := mizu.Contains(test.Slice, test.ContainsValue)
|
||||
actual := shared.Contains(test.Slice, test.ContainsValue)
|
||||
if actual != test.Expected {
|
||||
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
|
||||
}
|
||||
@ -63,7 +63,7 @@ func TestContainsEmptySlice(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.ContainsValue, func(t *testing.T) {
|
||||
actual := mizu.Contains(test.Slice, test.ContainsValue)
|
||||
actual := shared.Contains(test.Slice, test.ContainsValue)
|
||||
if actual != test.Expected {
|
||||
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
|
||||
}
|
||||
@ -83,7 +83,7 @@ func TestContainsNilSlice(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.ContainsValue, func(t *testing.T) {
|
||||
actual := mizu.Contains(test.Slice, test.ContainsValue)
|
||||
actual := shared.Contains(test.Slice, test.ContainsValue)
|
||||
if actual != test.Expected {
|
||||
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
|
||||
}
|
||||
@ -102,7 +102,7 @@ func TestUniqueNoDuplicateValues(t *testing.T) {
|
||||
|
||||
for index, test := range tests {
|
||||
t.Run(fmt.Sprintf("%v", index), func(t *testing.T) {
|
||||
actual := mizu.Unique(test.Slice)
|
||||
actual := shared.Unique(test.Slice)
|
||||
if !reflect.DeepEqual(test.Expected, actual) {
|
||||
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
|
||||
}
|
||||
@ -121,7 +121,7 @@ func TestUniqueDuplicateValues(t *testing.T) {
|
||||
|
||||
for index, test := range tests {
|
||||
t.Run(fmt.Sprintf("%v", index), func(t *testing.T) {
|
||||
actual := mizu.Unique(test.Slice)
|
||||
actual := shared.Unique(test.Slice)
|
||||
if !reflect.DeepEqual(test.Expected, actual) {
|
||||
t.Errorf("unexpected result - Expected: %v, actual: %v", test.Expected, actual)
|
||||
}
|
@ -106,7 +106,7 @@ type MizuEntry struct {
|
||||
UpdatedAt time.Time
|
||||
ProtocolName string `json:"protocolName" gorm:"column:protocolName"`
|
||||
ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"`
|
||||
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"`
|
||||
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolAbbreviation"`
|
||||
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
|
||||
ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"`
|
||||
ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"`
|
||||
@ -138,6 +138,7 @@ type MizuEntryWrapper struct {
|
||||
BodySize int64 `json:"bodySize"`
|
||||
Data MizuEntry `json:"data"`
|
||||
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
|
||||
IsRulesEnabled bool `json:"isRulesEnabled"`
|
||||
}
|
||||
|
||||
type BaseEntryDetails struct {
|
||||
@ -156,7 +157,7 @@ type BaseEntryDetails struct {
|
||||
SourcePort string `json:"sourcePort,omitempty"`
|
||||
DestinationPort string `json:"destinationPort,omitempty"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty"`
|
||||
Latency int64 `json:"latency,omitempty"`
|
||||
Latency int64 `json:"latency"`
|
||||
Rules ApplicableRules `json:"rules,omitempty"`
|
||||
}
|
||||
|
||||
@ -190,6 +191,7 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
|
||||
bed.Timestamp = entry.Timestamp
|
||||
bed.RequestSenderIp = entry.RequestSenderIp
|
||||
bed.IsOutgoing = entry.IsOutgoing
|
||||
bed.Latency = entry.ElapsedTime
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
package api
|
||||
|
||||
type TrafficFilteringOptions struct {
|
||||
HealthChecksUserAgentHeaders []string
|
||||
PlainTextMaskingRegexes []*SerializableRegexp
|
||||
DisableRedaction bool
|
||||
IgnoredUserAgents []string
|
||||
PlainTextMaskingRegexes []*SerializableRegexp
|
||||
DisableRedaction bool
|
||||
}
|
||||
|
@ -312,7 +312,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
SourcePort: entry.SourcePort,
|
||||
DestinationPort: entry.DestinationPort,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: 0,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: api.ApplicableRules{
|
||||
Latency: 0,
|
||||
Status: false,
|
||||
|
@ -14,9 +14,14 @@ import (
|
||||
)
|
||||
|
||||
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
if IsIgnoredUserAgent(item, options) {
|
||||
return
|
||||
}
|
||||
|
||||
if !options.DisableRedaction {
|
||||
FilterSensitiveData(item, options)
|
||||
}
|
||||
|
||||
emitter.Emit(item)
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
SourcePort: entry.SourcePort,
|
||||
DestinationPort: entry.DestinationPort,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: 0,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: api.ApplicableRules{
|
||||
Latency: 0,
|
||||
Status: false,
|
||||
|
@ -25,6 +25,30 @@ var personallyIdentifiableDataFields = []string{"token", "authorization", "authe
|
||||
"zip", "zipcode", "address", "country", "firstname", "lastname",
|
||||
"middlename", "fname", "lname", "birthdate"}
|
||||
|
||||
func IsIgnoredUserAgent(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) bool {
|
||||
if item.Protocol.Name != "http" {
|
||||
return false
|
||||
}
|
||||
|
||||
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
|
||||
|
||||
for headerKey, headerValues := range request.Header {
|
||||
if strings.ToLower(headerKey) == "user-agent" {
|
||||
for _, userAgent := range options.IgnoredUserAgents {
|
||||
for _, headerValue := range headerValues {
|
||||
if strings.Contains(strings.ToLower(headerValue), strings.ToLower(userAgent)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
|
||||
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
|
||||
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)
|
||||
@ -86,6 +110,10 @@ func getContentTypeHeaderValue(headers http.Header) string {
|
||||
}
|
||||
|
||||
func isFieldNameSensitive(fieldName string) bool {
|
||||
if fieldName == ":authority" {
|
||||
return false
|
||||
}
|
||||
|
||||
name := strings.ToLower(fieldName)
|
||||
name = strings.ReplaceAll(name, "_", "")
|
||||
name = strings.ReplaceAll(name, "-", "")
|
||||
|
@ -186,7 +186,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
SourcePort: entry.SourcePort,
|
||||
DestinationPort: entry.DestinationPort,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: 0,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: api.ApplicableRules{
|
||||
Latency: 0,
|
||||
Status: false,
|
||||
|
14
tap/extensions/redis/errors.go
Normal file
14
tap/extensions/redis/errors.go
Normal file
@ -0,0 +1,14 @@
|
||||
package main
|
||||
|
||||
//ConnectError redis connection error,such as io timeout
|
||||
type ConnectError struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
func newConnectError(message string) *ConnectError {
|
||||
return &ConnectError{Message: message}
|
||||
}
|
||||
|
||||
func (e *ConnectError) Error() string {
|
||||
return e.Message
|
||||
}
|
9
tap/extensions/redis/go.mod
Normal file
9
tap/extensions/redis/go.mod
Normal file
@ -0,0 +1,9 @@
|
||||
module github.com/up9inc/mizu/tap/extensions/redis
|
||||
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
|
55
tap/extensions/redis/handlers.go
Normal file
55
tap/extensions/redis/handlers.go
Normal file
@ -0,0 +1,55 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
|
||||
counterPair.Request++
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
)
|
||||
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.SrcIP,
|
||||
ClientPort: tcpID.SrcPort,
|
||||
ServerIP: tcpID.DstIP,
|
||||
ServerPort: tcpID.DstPort,
|
||||
IsOutgoing: true,
|
||||
}
|
||||
emitter.Emit(item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
|
||||
counterPair.Response++
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
)
|
||||
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.DstIP,
|
||||
ClientPort: tcpID.DstPort,
|
||||
ServerIP: tcpID.SrcIP,
|
||||
ServerPort: tcpID.SrcPort,
|
||||
IsOutgoing: false,
|
||||
}
|
||||
emitter.Emit(item)
|
||||
}
|
||||
return nil
|
||||
}
|
57
tap/extensions/redis/helpers.go
Normal file
57
tap/extensions/redis/helpers.go
Normal file
@ -0,0 +1,57 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type RedisPayload struct {
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
type RedisPayloader interface {
|
||||
MarshalJSON() ([]byte, error)
|
||||
}
|
||||
|
||||
func (h RedisPayload) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(h.Data)
|
||||
}
|
||||
|
||||
type RedisWrapper struct {
|
||||
Method string `json:"method"`
|
||||
Url string `json:"url"`
|
||||
Details interface{} `json:"details"`
|
||||
}
|
||||
|
||||
func representGeneric(generic map[string]interface{}) (representation []interface{}) {
|
||||
details, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Type",
|
||||
"value": generic["type"].(string),
|
||||
},
|
||||
{
|
||||
"name": "Command",
|
||||
"value": generic["command"].(string),
|
||||
},
|
||||
{
|
||||
"name": "Key",
|
||||
"value": generic["key"].(string),
|
||||
},
|
||||
{
|
||||
"name": "Value",
|
||||
"value": generic["value"].(string),
|
||||
},
|
||||
{
|
||||
"name": "Keyword",
|
||||
"value": generic["keyword"].(string),
|
||||
},
|
||||
})
|
||||
representation = append(representation, map[string]string{
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
|
||||
return
|
||||
}
|
154
tap/extensions/redis/main.go
Normal file
154
tap/extensions/redis/main.go
Normal file
@ -0,0 +1,154 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var protocol api.Protocol = api.Protocol{
|
||||
Name: "redis",
|
||||
LongName: "Redis Serialization Protocol",
|
||||
Abbreviation: "REDIS",
|
||||
Version: "3.x",
|
||||
BackgroundColor: "#a41e11",
|
||||
ForegroundColor: "#ffffff",
|
||||
FontSize: 11,
|
||||
ReferenceLink: "https://redis.io/topics/protocol",
|
||||
Ports: []string{"6379"},
|
||||
Priority: 3,
|
||||
}
|
||||
|
||||
func init() {
|
||||
log.Println("Initializing Redis extension...")
|
||||
}
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
is := &RedisInputStream{
|
||||
Reader: b,
|
||||
Buf: make([]byte, 8192),
|
||||
}
|
||||
proto := NewProtocol(is)
|
||||
for {
|
||||
redisPacket, err := proto.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isClient {
|
||||
handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
|
||||
} else {
|
||||
handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
service := "redis"
|
||||
if resolvedDestination != "" {
|
||||
service = resolvedDestination
|
||||
} else if resolvedSource != "" {
|
||||
service = resolvedSource
|
||||
}
|
||||
|
||||
method := ""
|
||||
if reqDetails["command"] != nil {
|
||||
method = reqDetails["command"].(string)
|
||||
}
|
||||
|
||||
summary := ""
|
||||
if reqDetails["key"] != nil {
|
||||
summary = reqDetails["key"].(string)
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: protocol.Name,
|
||||
ProtocolLongName: protocol.LongName,
|
||||
ProtocolAbbreviation: protocol.Abbreviation,
|
||||
ProtocolVersion: protocol.Version,
|
||||
ProtocolBackgroundColor: protocol.BackgroundColor,
|
||||
ProtocolForegroundColor: protocol.ForegroundColor,
|
||||
ProtocolFontSize: protocol.FontSize,
|
||||
ProtocolReferenceLink: protocol.ReferenceLink,
|
||||
EntryId: entryId,
|
||||
Entry: string(entryBytes),
|
||||
Url: fmt.Sprintf("%s%s", service, summary),
|
||||
Method: method,
|
||||
Status: 0,
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: 0,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
SourceIp: item.ConnectionInfo.ClientIP,
|
||||
DestinationIp: item.ConnectionInfo.ServerIP,
|
||||
SourcePort: item.ConnectionInfo.ClientPort,
|
||||
DestinationPort: item.ConnectionInfo.ServerPort,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
return &api.BaseEntryDetails{
|
||||
Id: entry.EntryId,
|
||||
Protocol: protocol,
|
||||
Url: entry.Url,
|
||||
RequestSenderIp: entry.RequestSenderIp,
|
||||
Service: entry.Service,
|
||||
Summary: entry.Path,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
Timestamp: entry.Timestamp,
|
||||
SourceIp: entry.SourceIp,
|
||||
DestinationIp: entry.DestinationIp,
|
||||
SourcePort: entry.SourcePort,
|
||||
DestinationPort: entry.DestinationPort,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: api.ApplicableRules{
|
||||
Latency: 0,
|
||||
Status: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
|
||||
p = protocol
|
||||
bodySize = 0
|
||||
var root map[string]interface{}
|
||||
json.Unmarshal([]byte(entry.Entry), &root)
|
||||
representation := make(map[string]interface{}, 0)
|
||||
request := root["request"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
response := root["response"].(map[string]interface{})["payload"].(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
resDetails := response["details"].(map[string]interface{})
|
||||
repRequest := representGeneric(reqDetails)
|
||||
repResponse := representGeneric(resDetails)
|
||||
representation["request"] = repRequest
|
||||
representation["response"] = repResponse
|
||||
object, err = json.Marshal(representation)
|
||||
return
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
102
tap/extensions/redis/matcher.go
Normal file
102
tap/extensions/redis/matcher.go
Normal file
@ -0,0 +1,102 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var reqResMatcher = createResponseRequestMatcher() // global
|
||||
|
||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
|
||||
func createResponseRequestMatcher() requestResponseMatcher {
|
||||
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
||||
return *newMatcher
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
requestRedisMessage := api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
Payload: RedisPayload{
|
||||
Data: &RedisWrapper{
|
||||
Method: string(request.Command),
|
||||
Url: "",
|
||||
Details: request,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
responseRedisMessage := response.(*api.GenericMessage)
|
||||
if responseRedisMessage.IsRequest {
|
||||
return nil
|
||||
}
|
||||
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &requestRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
responseRedisMessage := api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
Payload: RedisPayload{
|
||||
Data: &RedisWrapper{
|
||||
Method: string(response.Command),
|
||||
Url: "",
|
||||
Details: response,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
requestRedisMessage := request.(*api.GenericMessage)
|
||||
if !requestRedisMessage.IsRequest {
|
||||
return nil
|
||||
}
|
||||
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &responseRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.GenericMessage, responseRedisMessage *api.GenericMessage) *api.OutputChannelItem {
|
||||
return &api.OutputChannelItem{
|
||||
Protocol: protocol,
|
||||
Timestamp: requestRedisMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
|
||||
ConnectionInfo: nil,
|
||||
Pair: &api.RequestResponsePair{
|
||||
Request: *requestRedisMessage,
|
||||
Response: *responseRedisMessage,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func splitIdent(ident string) []string {
|
||||
ident = strings.Replace(ident, "->", " ", -1)
|
||||
return strings.Split(ident, " ")
|
||||
}
|
||||
|
||||
func genKey(split []string) string {
|
||||
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
|
||||
return key
|
||||
}
|
474
tap/extensions/redis/read.go
Normal file
474
tap/extensions/redis/read.go
Normal file
@ -0,0 +1,474 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
askPrefix = "ASK "
|
||||
movedPrefix = "MOVED "
|
||||
clusterDownPrefix = "CLUSTERDOWN "
|
||||
busyPrefix = "BUSY "
|
||||
noscriptPrefix = "NOSCRIPT "
|
||||
|
||||
defaultHost = "localhost"
|
||||
defaultPort = 6379
|
||||
defaultSentinelPort = 26379
|
||||
defaultTimeout = 5 * time.Second
|
||||
defaultDatabase = 2 * time.Second
|
||||
|
||||
dollarByte = '$'
|
||||
asteriskByte = '*'
|
||||
plusByte = '+'
|
||||
minusByte = '-'
|
||||
colonByte = ':'
|
||||
notApplicableByte = '0'
|
||||
|
||||
sentinelMasters = "masters"
|
||||
sentinelGetMasterAddrByName = "get-master-addr-by-name"
|
||||
sentinelReset = "reset"
|
||||
sentinelSlaves = "slaves"
|
||||
sentinelFailOver = "failover"
|
||||
sentinelMonitor = "monitor"
|
||||
sentinelRemove = "remove"
|
||||
sentinelSet = "set"
|
||||
|
||||
clusterNodes = "nodes"
|
||||
clusterMeet = "meet"
|
||||
clusterReset = "reset"
|
||||
clusterAddSlots = "addslots"
|
||||
clusterDelSlots = "delslots"
|
||||
clusterInfo = "info"
|
||||
clusterGetKeysInSlot = "getkeysinslot"
|
||||
clusterSetSlot = "setslot"
|
||||
clusterSetSlotNode = "node"
|
||||
clusterSetSlotMigrating = "migrating"
|
||||
clusterSetSlotImporting = "importing"
|
||||
clusterSetSlotStable = "stable"
|
||||
clusterForget = "forget"
|
||||
clusterFlushSlot = "flushslots"
|
||||
clusterKeySlot = "keyslot"
|
||||
clusterCountKeyInSlot = "countkeysinslot"
|
||||
clusterSaveConfig = "saveconfig"
|
||||
clusterReplicate = "replicate"
|
||||
clusterSlaves = "slaves"
|
||||
clusterFailOver = "failover"
|
||||
clusterSlots = "slots"
|
||||
pubSubChannels = "channels"
|
||||
pubSubNumSub = "numsub"
|
||||
pubSubNumPat = "numpat"
|
||||
)
|
||||
|
||||
//intToByteArr convert int to byte array
|
||||
func intToByteArr(a int) []byte {
|
||||
buf := make([]byte, 0)
|
||||
return strconv.AppendInt(buf, int64(a), 10)
|
||||
}
|
||||
|
||||
var (
|
||||
bytesTrue = intToByteArr(1)
|
||||
bytesFalse = intToByteArr(0)
|
||||
bytesTilde = []byte("~")
|
||||
|
||||
positiveInfinityBytes = []byte("+inf")
|
||||
negativeInfinityBytes = []byte("-inf")
|
||||
)
|
||||
|
||||
var (
|
||||
sizeTable = []int{9, 99, 999, 9999, 99999, 999999, 9999999, 99999999,
|
||||
999999999, math.MaxInt32}
|
||||
|
||||
digitTens = []byte{'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1',
|
||||
'1', '1', '1', '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2',
|
||||
'2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4',
|
||||
'4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6',
|
||||
'6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8',
|
||||
'8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9'}
|
||||
|
||||
digitOnes = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
|
||||
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8',
|
||||
'9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6',
|
||||
'7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4',
|
||||
'5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
|
||||
'3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
|
||||
|
||||
digits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a',
|
||||
'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
|
||||
't', 'u', 'v', 'w', 'x', 'y', 'z'}
|
||||
)
|
||||
|
||||
// receive message from redis
|
||||
type RedisInputStream struct {
|
||||
*bufio.Reader
|
||||
Buf []byte
|
||||
count int
|
||||
limit int
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readByte() (byte, error) {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
ret := r.Buf[r.count]
|
||||
r.count++
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) ensureFill() error {
|
||||
if r.count < r.limit {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
r.limit, err = r.Read(r.Buf)
|
||||
if err != nil {
|
||||
return newConnectError(err.Error())
|
||||
}
|
||||
r.count = 0
|
||||
if r.limit == -1 {
|
||||
return newConnectError("Unexpected end of stream")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readLine() (string, error) {
|
||||
buf := ""
|
||||
for {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
b := r.Buf[r.count]
|
||||
r.count++
|
||||
if b == '\r' {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
c := r.Buf[r.count]
|
||||
r.count++
|
||||
if c == '\n' {
|
||||
break
|
||||
}
|
||||
buf += string(b)
|
||||
buf += string(c)
|
||||
} else {
|
||||
buf += string(b)
|
||||
}
|
||||
}
|
||||
if buf == "" {
|
||||
return "", newConnectError("It seems like server has closed the connection.")
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readLineBytes() ([]byte, error) {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pos := r.count
|
||||
buf := r.Buf
|
||||
for {
|
||||
if pos == r.limit {
|
||||
return r.readLineBytesSlowly()
|
||||
}
|
||||
p := buf[pos]
|
||||
pos++
|
||||
if p == '\r' {
|
||||
if pos == r.limit {
|
||||
return r.readLineBytesSlowly()
|
||||
}
|
||||
p := buf[pos]
|
||||
pos++
|
||||
if p == '\n' {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
N := pos - r.count - 2
|
||||
line := make([]byte, N)
|
||||
j := 0
|
||||
for i := r.count; i <= N; i++ {
|
||||
line[j] = buf[i]
|
||||
j++
|
||||
}
|
||||
r.count = pos
|
||||
return line, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readLineBytesSlowly() ([]byte, error) {
|
||||
buf := make([]byte, 0)
|
||||
for {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := r.Buf[r.count]
|
||||
r.count++
|
||||
if b == 'r' {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := r.Buf[r.count]
|
||||
r.count++
|
||||
if c == '\n' {
|
||||
break
|
||||
}
|
||||
buf = append(buf, b)
|
||||
buf = append(buf, c)
|
||||
} else {
|
||||
buf = append(buf, b)
|
||||
}
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (r *RedisInputStream) readIntCrLf() (int64, error) {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
buf := r.Buf
|
||||
isNeg := false
|
||||
if buf[r.count] == '-' {
|
||||
isNeg = true
|
||||
}
|
||||
if isNeg {
|
||||
r.count++
|
||||
}
|
||||
value := int64(0)
|
||||
for {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
b := buf[r.count]
|
||||
r.count++
|
||||
if b == '\r' {
|
||||
err := r.ensureFill()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
c := buf[r.count]
|
||||
r.count++
|
||||
if c != '\n' {
|
||||
return 0, newConnectError("Unexpected character!")
|
||||
}
|
||||
break
|
||||
} else {
|
||||
value = value*10 + int64(b) - int64('0')
|
||||
}
|
||||
}
|
||||
if isNeg {
|
||||
return -value, nil
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
type RedisProtocol struct {
|
||||
is *RedisInputStream
|
||||
}
|
||||
|
||||
func NewProtocol(is *RedisInputStream) *RedisProtocol {
|
||||
return &RedisProtocol{
|
||||
is: is,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
|
||||
x, r, err := p.process()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
packet = &RedisPacket{}
|
||||
packet.Type = r
|
||||
|
||||
switch x.(type) {
|
||||
case []interface{}:
|
||||
array := x.([]interface{})
|
||||
packet.Command = RedisCommand(strings.ToUpper(string(array[0].([]uint8))))
|
||||
if len(array) > 1 {
|
||||
packet.Key = string(array[1].([]uint8))
|
||||
}
|
||||
if len(array) > 2 {
|
||||
packet.Value = string(array[2].([]uint8))
|
||||
}
|
||||
case []uint8:
|
||||
val := string(x.([]uint8))
|
||||
if packet.Type == types[plusByte] {
|
||||
packet.Keyword = RedisKeyword(strings.ToUpper(val))
|
||||
if !isValidRedisKeyword(keywords, packet.Keyword) {
|
||||
err = errors.New(fmt.Sprintf("Unrecognized keyword: %s", string(packet.Command)))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
packet.Value = val
|
||||
}
|
||||
case string:
|
||||
packet.Value = x.(string)
|
||||
default:
|
||||
msg := fmt.Sprintf("Unrecognized Redis data type: %v\n", reflect.TypeOf(x))
|
||||
log.Printf(msg)
|
||||
err = errors.New(msg)
|
||||
return
|
||||
}
|
||||
|
||||
if packet.Command != "" {
|
||||
if !isValidRedisCommand(commands, packet.Command) {
|
||||
err = errors.New(fmt.Sprintf("Unrecognized command: %s", string(packet.Command)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) process() (v interface{}, r RedisType, err error) {
|
||||
b, err := p.is.readByte()
|
||||
if err != nil {
|
||||
return nil, types[notApplicableByte], newConnectError(err.Error())
|
||||
}
|
||||
switch b {
|
||||
case plusByte:
|
||||
v, err = p.processSimpleString()
|
||||
r = types[plusByte]
|
||||
return
|
||||
case dollarByte:
|
||||
v, err = p.processBulkString()
|
||||
r = types[dollarByte]
|
||||
return
|
||||
case asteriskByte:
|
||||
v, err = p.processArray()
|
||||
r = types[asteriskByte]
|
||||
return
|
||||
case colonByte:
|
||||
v, err = p.processInteger()
|
||||
r = types[colonByte]
|
||||
return
|
||||
case minusByte:
|
||||
v, err = p.processError()
|
||||
r = types[minusByte]
|
||||
return
|
||||
default:
|
||||
return nil, types[notApplicableByte], newConnectError(fmt.Sprintf("Unknown reply: %b", b))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processSimpleString() ([]byte, error) {
|
||||
return p.is.readLineBytes()
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processBulkString() ([]byte, error) {
|
||||
l, err := p.is.readIntCrLf()
|
||||
if err != nil {
|
||||
return nil, newConnectError(err.Error())
|
||||
}
|
||||
if l == -1 {
|
||||
return nil, nil
|
||||
}
|
||||
line := make([]byte, 0)
|
||||
for {
|
||||
err := p.is.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := p.is.Buf[p.is.count]
|
||||
p.is.count++
|
||||
if b == '\r' {
|
||||
err := p.is.ensureFill()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := p.is.Buf[p.is.count]
|
||||
p.is.count++
|
||||
if c != '\n' {
|
||||
return nil, newConnectError("Unexpected character!")
|
||||
}
|
||||
break
|
||||
} else {
|
||||
line = append(line, b)
|
||||
}
|
||||
}
|
||||
return line, nil
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processArray() ([]interface{}, error) {
|
||||
l, err := p.is.readIntCrLf()
|
||||
if err != nil {
|
||||
return nil, newConnectError(err.Error())
|
||||
}
|
||||
if l == -1 {
|
||||
return nil, nil
|
||||
}
|
||||
ret := make([]interface{}, 0)
|
||||
for i := 0; i < int(l); i++ {
|
||||
if obj, _, err := p.process(); err != nil {
|
||||
ret = append(ret, err)
|
||||
} else {
|
||||
ret = append(ret, obj)
|
||||
}
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processInteger() (int64, error) {
|
||||
return p.is.readIntCrLf()
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) processError() (interface{}, error) {
|
||||
msg, err := p.is.readLine()
|
||||
if err != nil {
|
||||
return nil, newConnectError(err.Error())
|
||||
}
|
||||
if strings.HasPrefix(msg, movedPrefix) {
|
||||
host, port, slot, err := p.parseTargetHostAndSlot(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fmt.Sprintf("MovedDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil
|
||||
} else if strings.HasPrefix(msg, askPrefix) {
|
||||
host, port, slot, err := p.parseTargetHostAndSlot(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fmt.Sprintf("AskDataError: %s host: %s port: %d slot: %d", msg, host, port, slot), nil
|
||||
} else if strings.HasPrefix(msg, clusterDownPrefix) {
|
||||
return fmt.Sprintf("ClusterError: %s", msg), nil
|
||||
} else if strings.HasPrefix(msg, busyPrefix) {
|
||||
return fmt.Sprintf("BusyError: %s", msg), nil
|
||||
} else if strings.HasPrefix(msg, noscriptPrefix) {
|
||||
return fmt.Sprintf("NoScriptError: %s", msg), nil
|
||||
}
|
||||
return fmt.Sprintf("DataError: %s", msg), nil
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) parseTargetHostAndSlot(clusterRedirectResponse string) (host string, po int, slot int, err error) {
|
||||
arr := strings.Split(clusterRedirectResponse, " ")
|
||||
host, port := p.extractParts(arr[2])
|
||||
slot, err = strconv.Atoi(arr[1])
|
||||
po, err = strconv.Atoi(port)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *RedisProtocol) extractParts(from string) (string, string) {
|
||||
idx := strings.LastIndex(from, ":")
|
||||
host := from
|
||||
if idx != -1 {
|
||||
host = from[0:idx]
|
||||
}
|
||||
port := ""
|
||||
if idx != -1 {
|
||||
port = from[idx+1:]
|
||||
}
|
||||
return host, port
|
||||
}
|
290
tap/extensions/redis/structs.go
Normal file
290
tap/extensions/redis/structs.go
Normal file
@ -0,0 +1,290 @@
|
||||
package main
|
||||
|
||||
type RedisType string
|
||||
type RedisCommand string
|
||||
type RedisKeyword string
|
||||
|
||||
var types map[rune]RedisType = map[rune]RedisType{
|
||||
plusByte: "Simple String",
|
||||
dollarByte: "Bulk String",
|
||||
asteriskByte: "Array",
|
||||
colonByte: "Integer",
|
||||
minusByte: "Error",
|
||||
notApplicableByte: "N/A",
|
||||
}
|
||||
|
||||
var commands []RedisCommand = []RedisCommand{
|
||||
"PING",
|
||||
"SET",
|
||||
"GET",
|
||||
"QUIT",
|
||||
"EXISTS",
|
||||
"DEL",
|
||||
"UNLINK",
|
||||
"TYPE",
|
||||
"FLUSHDB",
|
||||
"KEYS",
|
||||
"RANDOMKEY",
|
||||
"RENAME",
|
||||
"RENAMENX",
|
||||
"RENAMEX",
|
||||
"DBSIZE",
|
||||
"EXPIRE",
|
||||
"EXPIREAT",
|
||||
"TTL",
|
||||
"SELECT",
|
||||
"MOVE",
|
||||
"FLUSHALL",
|
||||
"GETSET",
|
||||
"MGET",
|
||||
"SETNX",
|
||||
"SETEX",
|
||||
"MSET",
|
||||
"MSETNX",
|
||||
"DECRBY",
|
||||
"DECR",
|
||||
"INCRBY",
|
||||
"INCR",
|
||||
"APPEND",
|
||||
"SUBSTR",
|
||||
"HSET",
|
||||
"HGET",
|
||||
"HSETNX",
|
||||
"HMSET",
|
||||
"HMGET",
|
||||
"HINCRBY",
|
||||
"HEXISTS",
|
||||
"HDEL",
|
||||
"HLEN",
|
||||
"HKEYS",
|
||||
"HVALS",
|
||||
"HGETALL",
|
||||
"RPUSH",
|
||||
"LPUSH",
|
||||
"LLEN",
|
||||
"LRANGE",
|
||||
"LTRIM",
|
||||
"LINDEX",
|
||||
"LSET",
|
||||
"LREM",
|
||||
"LPOP",
|
||||
"RPOP",
|
||||
"RPOPLPUSH",
|
||||
"SADD",
|
||||
"SMEMBERS",
|
||||
"SREM",
|
||||
"SPOP",
|
||||
"SMOVE",
|
||||
"SCARD",
|
||||
"SISMEMBER",
|
||||
"SINTER",
|
||||
"SINTERSTORE",
|
||||
"SUNION",
|
||||
"SUNIONSTORE",
|
||||
"SDIFF",
|
||||
"SDIFFSTORE",
|
||||
"SRANDMEMBER",
|
||||
"ZADD",
|
||||
"ZRANGE",
|
||||
"ZREM",
|
||||
"ZINCRBY",
|
||||
"ZRANK",
|
||||
"ZREVRANK",
|
||||
"ZREVRANGE",
|
||||
"ZCARD",
|
||||
"ZSCORE",
|
||||
"MULTI",
|
||||
"DISCARD",
|
||||
"EXEC",
|
||||
"WATCH",
|
||||
"UNWATCH",
|
||||
"SORT",
|
||||
"BLPOP",
|
||||
"BRPOP",
|
||||
"AUTH",
|
||||
"SUBSCRIBE",
|
||||
"PUBLISH",
|
||||
"UNSUBSCRIBE",
|
||||
"PSUBSCRIBE",
|
||||
"PUNSUBSCRIBE",
|
||||
"PUBSUB",
|
||||
"ZCOUNT",
|
||||
"ZRANGEBYSCORE",
|
||||
"ZREVRANGEBYSCORE",
|
||||
"ZREMRANGEBYRANK",
|
||||
"ZREMRANGEBYSCORE",
|
||||
"ZUNIONSTORE",
|
||||
"ZINTERSTORE",
|
||||
"ZLEXCOUNT",
|
||||
"ZRANGEBYLEX",
|
||||
"ZREVRANGEBYLEX",
|
||||
"ZREMRANGEBYLEX",
|
||||
"SAVE",
|
||||
"BGSAVE",
|
||||
"BGREWRITEAOF",
|
||||
"LASTSAVE",
|
||||
"SHUTDOWN",
|
||||
"INFO",
|
||||
"MONITOR",
|
||||
"SLAVEOF",
|
||||
"CONFIG",
|
||||
"STRLEN",
|
||||
"SYNC",
|
||||
"LPUSHX",
|
||||
"PERSIST",
|
||||
"RPUSHX",
|
||||
"ECHO",
|
||||
"LINSERT",
|
||||
"DEBUG",
|
||||
"BRPOPLPUSH",
|
||||
"SETBIT",
|
||||
"GETBIT",
|
||||
"BITPOS",
|
||||
"SETRANGE",
|
||||
"GETRANGE",
|
||||
"EVAL",
|
||||
"EVALSHA",
|
||||
"SCRIPT",
|
||||
"SLOWLOG",
|
||||
"OBJECT",
|
||||
"BITCOUNT",
|
||||
"BITOP",
|
||||
"SENTINEL",
|
||||
"DUMP",
|
||||
"RESTORE",
|
||||
"PEXPIRE",
|
||||
"PEXPIREAT",
|
||||
"PTTL",
|
||||
"INCRBYFLOAT",
|
||||
"PSETEX",
|
||||
"CLIENT",
|
||||
"TIME",
|
||||
"MIGRATE",
|
||||
"HINCRBYFLOAT",
|
||||
"SCAN",
|
||||
"HSCAN",
|
||||
"SSCAN",
|
||||
"ZSCAN",
|
||||
"WAIT",
|
||||
"CLUSTER",
|
||||
"ASKING",
|
||||
"PFADD",
|
||||
"PFCOUNT",
|
||||
"PFMERGE",
|
||||
"READONLY",
|
||||
"GEOADD",
|
||||
"GEODIST",
|
||||
"GEOHASH",
|
||||
"GEOPOS",
|
||||
"GEORADIUS",
|
||||
"GEORADIUS_RO",
|
||||
"GEORADIUSBYMEMBER",
|
||||
"GEORADIUSBYMEMBER_RO",
|
||||
"MODULE",
|
||||
"BITFIELD",
|
||||
"HSTRLEN",
|
||||
"TOUCH",
|
||||
"SWAPDB",
|
||||
"MEMORY",
|
||||
"XADD",
|
||||
"XLEN",
|
||||
"XDEL",
|
||||
"XTRIM",
|
||||
"XRANGE",
|
||||
"XREVRANGE",
|
||||
"XREAD",
|
||||
"XACK",
|
||||
"XGROUP",
|
||||
"XREADGROUP",
|
||||
"XPENDING",
|
||||
"XCLAIM",
|
||||
}
|
||||
|
||||
var keywords []RedisKeyword = []RedisKeyword{
|
||||
"AGGREGATE",
|
||||
"ALPHA",
|
||||
"ASC",
|
||||
"BY",
|
||||
"DESC",
|
||||
"GET",
|
||||
"LIMIT",
|
||||
"MESSAGE",
|
||||
"NO",
|
||||
"NOSORT",
|
||||
"PMESSAGE",
|
||||
"PSUBSCRIBE",
|
||||
"PUNSUBSCRIBE",
|
||||
"OK",
|
||||
"ONE",
|
||||
"QUEUED",
|
||||
"SET",
|
||||
"STORE",
|
||||
"SUBSCRIBE",
|
||||
"UNSUBSCRIBE",
|
||||
"WEIGHTS",
|
||||
"WITHSCORES",
|
||||
"RESETSTAT",
|
||||
"REWRITE",
|
||||
"RESET",
|
||||
"FLUSH",
|
||||
"EXISTS",
|
||||
"LOAD",
|
||||
"KILL",
|
||||
"LEN",
|
||||
"REFCOUNT",
|
||||
"ENCODING",
|
||||
"IDLETIME",
|
||||
"GETNAME",
|
||||
"SETNAME",
|
||||
"LIST",
|
||||
"MATCH",
|
||||
"COUNT",
|
||||
"PING",
|
||||
"PONG",
|
||||
"UNLOAD",
|
||||
"REPLACE",
|
||||
"KEYS",
|
||||
"PAUSE",
|
||||
"DOCTOR",
|
||||
"BLOCK",
|
||||
"NOACK",
|
||||
"STREAMS",
|
||||
"KEY",
|
||||
"CREATE",
|
||||
"MKSTREAM",
|
||||
"SETID",
|
||||
"DESTROY",
|
||||
"DELCONSUMER",
|
||||
"MAXLEN",
|
||||
"GROUP",
|
||||
"IDLE",
|
||||
"TIME",
|
||||
"RETRYCOUNT",
|
||||
"FORCE",
|
||||
}
|
||||
|
||||
type RedisPacket struct {
|
||||
Type RedisType `json:"type"`
|
||||
Command RedisCommand `json:"command"`
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
Keyword RedisKeyword `json:"keyword"`
|
||||
}
|
||||
|
||||
func isValidRedisCommand(s []RedisCommand, c RedisCommand) bool {
|
||||
for _, v := range s {
|
||||
if v == c {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isValidRedisKeyword(s []RedisKeyword, c RedisKeyword) bool {
|
||||
for _, v := range s {
|
||||
if v == c {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
6
ui/package-lock.json
generated
6
ui/package-lock.json
generated
@ -13454,9 +13454,9 @@
|
||||
}
|
||||
},
|
||||
"react-scrollable-feed-virtualized": {
|
||||
"version": "1.4.2",
|
||||
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.2.tgz",
|
||||
"integrity": "sha512-j6M80ETqhSRBlygWe491gMPqCiAkVUQsLd/JR7DCKsZF44IYlJiunZWjdWe3//gxddTL68pjqq8pMvd1YN1bgw=="
|
||||
"version": "1.4.3",
|
||||
"resolved": "https://registry.npmjs.org/react-scrollable-feed-virtualized/-/react-scrollable-feed-virtualized-1.4.3.tgz",
|
||||
"integrity": "sha512-M9WgJKr57jCyWKNCksc3oi+xhtO0YbL9d7Ll8Sdc5ZWOIstNvdNbNX0k4Nq6kXUVaHCJ9qE8omdSI/CxT3MLAQ=="
|
||||
},
|
||||
"react-syntax-highlighter": {
|
||||
"version": "15.4.3",
|
||||
|
@ -21,7 +21,7 @@
|
||||
"react-copy-to-clipboard": "^5.0.3",
|
||||
"react-dom": "^17.0.2",
|
||||
"react-scripts": "4.0.3",
|
||||
"react-scrollable-feed-virtualized": "^1.4.2",
|
||||
"react-scrollable-feed-virtualized": "^1.4.3",
|
||||
"react-syntax-highlighter": "^15.4.3",
|
||||
"typescript": "^4.2.4",
|
||||
"web-vitals": "^1.1.1"
|
||||
|
@ -91,7 +91,7 @@ const App = () => {
|
||||
</table>
|
||||
|
||||
</span>
|
||||
|
||||
|
||||
return (
|
||||
<div className="mizuApp">
|
||||
<div className="header">
|
||||
|
@ -19,7 +19,8 @@ interface EntriesListProps {
|
||||
setNoMoreDataBottom: (flag: boolean) => void;
|
||||
methodsFilter: Array<string>;
|
||||
statusFilter: Array<string>;
|
||||
pathFilter: string
|
||||
pathFilter: string;
|
||||
serviceFilter: string;
|
||||
listEntryREF: any;
|
||||
onScrollEvent: (isAtBottom:boolean) => void;
|
||||
scrollableList: boolean;
|
||||
@ -32,7 +33,7 @@ enum FetchOperator {
|
||||
|
||||
const api = new Api();
|
||||
|
||||
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, listEntryREF, onScrollEvent, scrollableList}) => {
|
||||
export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, focusedEntryId, setFocusedEntryId, connectionOpen, noMoreDataTop, setNoMoreDataTop, noMoreDataBottom, setNoMoreDataBottom, methodsFilter, statusFilter, pathFilter, serviceFilter, listEntryREF, onScrollEvent, scrollableList}) => {
|
||||
|
||||
const [loadMoreTop, setLoadMoreTop] = useState(false);
|
||||
const [isLoadingTop, setIsLoadingTop] = useState(false);
|
||||
@ -54,10 +55,11 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
|
||||
const filterEntries = useCallback((entry) => {
|
||||
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
|
||||
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
|
||||
if(serviceFilter && entry.service?.toLowerCase()?.indexOf(serviceFilter) === -1) return;
|
||||
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
|
||||
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
|
||||
return entry;
|
||||
},[methodsFilter, pathFilter, statusFilter])
|
||||
},[methodsFilter, pathFilter, statusFilter, serviceFilter])
|
||||
|
||||
const filteredEntries = useMemo(() => {
|
||||
return entries.filter(filterEntries);
|
||||
|
@ -41,7 +41,7 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
|
||||
<Protocol protocol={protocol} horizontal={true}/>
|
||||
<div style={{right: "30px", position: "absolute", display: "flex"}}>
|
||||
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
|
||||
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
|
||||
{response.payload && <div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>}
|
||||
</div>
|
||||
</div>;
|
||||
};
|
||||
@ -71,7 +71,7 @@ export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
|
||||
/>
|
||||
{entryData.data && <EntrySummary data={entryData.data}/>}
|
||||
<>
|
||||
{entryData.data && <EntryViewer representation={entryData.representation} rulesMatched={entryData.rulesMatched} elapsedTime={entryData.data.elapsedTime} color={entryData.protocol.backgroundColor}/>}
|
||||
{entryData.data && <EntryViewer representation={entryData.representation} isRulesEnabled={entryData.isRulesEnabled} rulesMatched={entryData.rulesMatched} elapsedTime={entryData.data.elapsedTime} color={entryData.protocol.backgroundColor}/>}
|
||||
</>
|
||||
</>
|
||||
};
|
||||
|
@ -153,10 +153,8 @@ export const EntryTableSection: React.FC<EntrySectionProps> = ({title, color, ar
|
||||
|
||||
|
||||
interface EntryPolicySectionProps {
|
||||
service: string,
|
||||
title: string,
|
||||
color: string,
|
||||
response: any,
|
||||
latency?: number,
|
||||
arrayToIterate: any[],
|
||||
}
|
||||
@ -200,7 +198,7 @@ export const EntryPolicySectionContainer: React.FC<EntryPolicySectionContainerPr
|
||||
</CollapsibleContainer>
|
||||
}
|
||||
|
||||
export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({service, title, color, response, latency, arrayToIterate}) => {
|
||||
export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({title, color, latency, arrayToIterate}) => {
|
||||
return <React.Fragment>
|
||||
{
|
||||
arrayToIterate && arrayToIterate.length > 0 ?
|
||||
|
@ -33,8 +33,8 @@ const SectionsRepresentation: React.FC<any> = ({data, color}) => {
|
||||
return <>{sections}</>;
|
||||
}
|
||||
|
||||
const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapsedTime, color}) => {
|
||||
const TABS = [
|
||||
const AutoRepresentation: React.FC<any> = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => {
|
||||
var TABS = [
|
||||
{
|
||||
tab: 'request'
|
||||
},
|
||||
@ -54,6 +54,14 @@ const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapse
|
||||
|
||||
const {request, response} = JSON.parse(representation);
|
||||
|
||||
if (!response) {
|
||||
TABS[1]['hidden'] = true;
|
||||
}
|
||||
|
||||
if (!isRulesEnabled) {
|
||||
TABS.pop()
|
||||
}
|
||||
|
||||
return <div className={styles.Entry}>
|
||||
{<div className={styles.body}>
|
||||
<div className={styles.bodyHeader}>
|
||||
@ -63,11 +71,11 @@ const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapse
|
||||
{currentTab === TABS[0].tab && <React.Fragment>
|
||||
<SectionsRepresentation data={request} color={color}/>
|
||||
</React.Fragment>}
|
||||
{currentTab === TABS[1].tab && <React.Fragment>
|
||||
{response && currentTab === TABS[1].tab && <React.Fragment>
|
||||
<SectionsRepresentation data={response} color={color}/>
|
||||
</React.Fragment>}
|
||||
{currentTab === TABS[2].tab && <React.Fragment>
|
||||
<EntryTablePolicySection service={representation.service} title={'Rule'} color={color} latency={elapsedTime} response={response} arrayToIterate={rulesMatched ? rulesMatched : []}/>
|
||||
{TABS.length > 2 && currentTab === TABS[2].tab && <React.Fragment>
|
||||
<EntryTablePolicySection title={'Rule'} color={color} latency={elapsedTime} arrayToIterate={rulesMatched ? rulesMatched : []}/>
|
||||
</React.Fragment>}
|
||||
</div>}
|
||||
</div>;
|
||||
@ -75,13 +83,14 @@ const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapse
|
||||
|
||||
interface Props {
|
||||
representation: any;
|
||||
isRulesEnabled: boolean;
|
||||
rulesMatched: any;
|
||||
color: string;
|
||||
elapsedTime: number;
|
||||
}
|
||||
|
||||
const EntryViewer: React.FC<Props> = ({representation, rulesMatched, elapsedTime, color}) => {
|
||||
return <AutoRepresentation representation={representation} rulesMatched={rulesMatched} elapsedTime={elapsedTime} color={color}/>
|
||||
const EntryViewer: React.FC<Props> = ({representation, isRulesEnabled, rulesMatched, elapsedTime, color}) => {
|
||||
return <AutoRepresentation representation={representation} isRulesEnabled={isRulesEnabled} rulesMatched={rulesMatched} elapsedTime={elapsedTime} color={color}/>
|
||||
};
|
||||
|
||||
export default EntryViewer;
|
||||
|
@ -36,8 +36,8 @@
|
||||
border-left: 5px $failure-color solid
|
||||
|
||||
.ruleNumberText
|
||||
font-size: 12px;
|
||||
font-style: italic;
|
||||
font-size: 12px
|
||||
font-weight: 600
|
||||
|
||||
.ruleNumberTextFailure
|
||||
color: #DB2156
|
||||
|
@ -68,7 +68,7 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
|
||||
let rule = 'latency' in entry.rules
|
||||
if (rule) {
|
||||
if (entry.rules.latency !== -1) {
|
||||
if (entry.rules.latency >= entry.latency) {
|
||||
if (entry.rules.latency >= entry.latency || !('latency' in entry)) {
|
||||
additionalRulesProperties = styles.ruleSuccessRow
|
||||
ruleSuccess = true
|
||||
} else {
|
||||
|
@ -11,13 +11,16 @@ interface FiltersProps {
|
||||
setStatusFilter: (methods: Array<string>) => void;
|
||||
pathFilter: string
|
||||
setPathFilter: (val: string) => void;
|
||||
serviceFilter: string
|
||||
setServiceFilter: (val: string) => void;
|
||||
}
|
||||
|
||||
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter}) => {
|
||||
export const Filters: React.FC<FiltersProps> = ({methodsFilter, setMethodsFilter, statusFilter, setStatusFilter, pathFilter, setPathFilter, serviceFilter, setServiceFilter}) => {
|
||||
|
||||
return <div className={styles.container}>
|
||||
<MethodFilter methodsFilter={methodsFilter} setMethodsFilter={setMethodsFilter}/>
|
||||
<StatusTypesFilter statusFilter={statusFilter} setStatusFilter={setStatusFilter}/>
|
||||
<ServiceFilter serviceFilter={serviceFilter} setServiceFilter={setServiceFilter}/>
|
||||
<PathFilter pathFilter={pathFilter} setPathFilter={setPathFilter}/>
|
||||
</div>;
|
||||
};
|
||||
@ -117,3 +120,18 @@ const PathFilter: React.FC<PathFilterProps> = ({pathFilter, setPathFilter}) => {
|
||||
</FilterContainer>;
|
||||
};
|
||||
|
||||
interface ServiceFilterProps {
|
||||
serviceFilter: string;
|
||||
setServiceFilter: (val: string) => void;
|
||||
}
|
||||
|
||||
const ServiceFilter: React.FC<ServiceFilterProps> = ({serviceFilter, setServiceFilter}) => {
|
||||
|
||||
return <FilterContainer>
|
||||
<div className={styles.filterLabel}>Service</div>
|
||||
<div>
|
||||
<TextField value={serviceFilter} variant="outlined" className={styles.filterText} style={{minWidth: '150px'}} onChange={(e: any) => setServiceFilter(e.target.value)}/>
|
||||
</div>
|
||||
</FilterContainer>;
|
||||
};
|
||||
|
||||
|
@ -58,6 +58,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
const [methodsFilter, setMethodsFilter] = useState([]);
|
||||
const [statusFilter, setStatusFilter] = useState([]);
|
||||
const [pathFilter, setPathFilter] = useState("");
|
||||
const [serviceFilter, setServiceFilter] = useState("");
|
||||
|
||||
const [tappingStatus, setTappingStatus] = useState(null);
|
||||
|
||||
@ -192,6 +193,8 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
setStatusFilter={setStatusFilter}
|
||||
pathFilter={pathFilter}
|
||||
setPathFilter={setPathFilter}
|
||||
serviceFilter={serviceFilter}
|
||||
setServiceFilter={setServiceFilter}
|
||||
/>
|
||||
<div className={styles.container}>
|
||||
<EntriesList entries={entries}
|
||||
@ -206,6 +209,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
|
||||
methodsFilter={methodsFilter}
|
||||
statusFilter={statusFilter}
|
||||
pathFilter={pathFilter}
|
||||
serviceFilter={serviceFilter}
|
||||
listEntryREF={listEntry}
|
||||
onScrollEvent={onScrollEvent}
|
||||
scrollableList={disableScrollList}
|
||||
|
Loading…
Reference in New Issue
Block a user