Compare commits

...

15 Commits

Author SHA1 Message Date
Igor Gov
749bee6d55 Using rlog in agent and tapper & removing unreferenced code (#289)
* .
2021-09-22 06:24:19 +03:00
M. Mert Yıldıran
043b845c06 Fix some errors related to conversion to HAR (#286)
* Fix some errors related to conversion to HAR

* Fix the build error

* Fix the variable name

* Change to `Errorf`
2021-09-20 13:53:20 +03:00
Igor Gov
8c7f82c6f0 Fixing readme ignored-user-agents documentation (#288) 2021-09-20 12:37:35 +03:00
RoyUP9
ec4fa2ee4f additional acceptance tests (#287) 2021-09-20 11:03:15 +03:00
Selton Fiuza
b50eced489 [Refactor/TRA-3692] rename test rules to traffic validation (#281) 2021-09-19 14:47:19 +03:00
M. Mert Yıldıran
5392475486 Fix the issues related to sensitive data filtering feature (#285)
* Run acceptance tests on pull request

* Take `options.DisableRedaction` into account

* Log `defaultTapConfig`

* Pass the `SENSITIVE_DATA_FILTERING_OPTIONS` to tapper daemon set too

* Revert "Run acceptance tests on pull request"

This reverts commit ad79f1418f.
2021-09-19 13:33:34 +03:00
M. Mert Yıldıran
65bb262652 Fix the OOMKilled error by calling debug.FreeOSMemory periodically (#277)
* Fix the OOMKilled error by calling `debug.FreeOSMemory` periodically

* Remove `MAX_NUMBER_OF_GOROUTINES` environment variable

* Change the line

* Increase the default value of `TCP_STREAM_CHANNEL_TIMEOUT_MS` to `10000`
2021-09-19 12:31:09 +03:00
RoyUP9
842d95c836 fixed redact and regex masking tests (#284) 2021-09-19 11:52:11 +03:00
M. Mert Yıldıran
9fa9b67328 Bring back the sensitive data filtering feature (#280)
* Bring back the sensitive data filtering feature

* Add `// global` comment
2021-09-18 20:13:59 +03:00
Selton Fiuza
6337b75f0e [TRA-3659] Fix rules (#271)
* Fix rules

* Not reay, error on running

* Empty dissector Rules()

* almost working

* Finally, fixed

* undo changes on agent/pkg/utils/har.go

* fix not showing service on rules detail

* Update tap/api/api.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Update agent/pkg/controllers/entries_controller.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* Update agent/pkg/controllers/entries_controller.go

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>

* unwrap Data

* Fix bug off using more than one latency rule that always get the first.

* fix json type, decoding base64 before unmarshal

* Run `go mod tidy` on `cli/go.sum`

* Fix the linting issues

* Remove a `FIXME` comment

* Remove a CSS rule

* Adapt `ruleNumberText` CSS class to the design language of the UI

* Fix an issue in the UI related to `rule.Latency` slipping out

* Removed unecessary codes.

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: M. Mert Yildiran <mehmet@up9.com>
2021-09-18 14:02:18 -03:00
Igor Gov
b9d2e671c7 Move all docs to docs folder and clean project root (#278) 2021-09-15 11:53:23 +03:00
RoyUP9
0840642c98 testing guidelines (#276) 2021-09-14 16:57:39 +03:00
RoyUP9
d5b01347df fixed crash when channel is closed (#273) 2021-09-14 11:53:47 +03:00
M. Mert Yıldıran
7dca1ad889 Move stats_tracker.go into the extension API and increment MatchedPairs from inside the Emit method (#272)
* Move `stats_tracker.go` into the extension API and increment `MatchedPairs` from inside the `Emit` method

* Replace multiple `sync.Mutex`(es) with low-level atomic memory primitives
2021-09-13 16:22:16 +03:00
M. Mert Yıldıran
616eccb2cf Make ScrollableFeed virtualized by replacing react-scrollable-feed with react-scrollable-feed-virtualized (#268)
* Make `ScrollableFeed` virtualized by replacing `react-scrollable-feed` with `react-scrollable-feed-virtualized`

* fix get new entries button

* Fix the not populated `Protocol` struct in case of `GetEntries` endpoint is called

Co-authored-by: Liraz Yehezkel <lirazy@up9.com>
2021-09-13 16:18:43 +03:00
58 changed files with 1200 additions and 23323 deletions

View File

@@ -39,7 +39,7 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.BuildTimestamp=${BUILD_TIMESTAMP}' \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
COPY build_extensions.sh ..
COPY devops/build_extensions.sh ..
RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.13.5

View File

@@ -44,11 +44,11 @@ push: push-docker push-cli ## Build and publish agent docker image & CLI.
push-docker: ## Build and publish agent docker image.
@echo "publishing Docker image .. "
./build-push-featurebranch.sh
devops/build-push-featurebranch.sh
build-docker-ci: ## Build agent docker image for CI.
@echo "building docker image for ci"
./build-agent-ci.sh
devops/build-agent-ci.sh
push-cli: ## Build and publish CLI.
@echo "publishing CLI .. "
@@ -73,7 +73,7 @@ clean-docker:
@(echo "DOCKER cleanup - NOT IMPLEMENTED YET " )
extensions:
./build_extensions.sh
devops/build_extensions.sh
test-cli:
@echo "running cli tests"; cd cli && $(MAKE) test

View File

@@ -46,7 +46,7 @@ While `mizu`most often works out of the box, you can influence its behavior:
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
For detailed list of k8s permissions see [PERMISSIONS](PERMISSIONS.md) document
For detailed list of k8s permissions see [PERMISSIONS](docs/PERMISSIONS.md) document
## How to Run
@@ -143,7 +143,7 @@ Setting `mizu-resources-namespace=mizu` resets Mizu to its default behavior
User-agent filtering (like health checks) - can be configured using command-line options:
```shell
$ mizu tap "^ca.*" --set ignored-user-agents=kube-probe --set ignored-user-agents=prometheus
$ mizu tap "^ca.*" --set tap.ignored-user-agents=kube-probe --set tap.ignored-user-agents=prometheus
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
Web interface is now available at http://localhost:8899

View File

@@ -1,15 +0,0 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
# TESTING
Testing guidelines for Mizu project
## Unit-tests
* TBD
* TBD
* TBD
## System tests
* TBD
* TBD
* TBD

View File

@@ -0,0 +1,196 @@
package acceptanceTests
import (
"archive/zip"
"os/exec"
"testing"
)
func TestLogs(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := getDefaultTapCommandArgs()
tapNamespace := getDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
t.Cleanup(func() {
if err := cleanupCommand(tapCmd); err != nil {
t.Logf("failed to cleanup tap command, err: %v", err)
}
})
if err := tapCmd.Start(); err != nil {
t.Errorf("failed to start tap command, err: %v", err)
return
}
apiServerUrl := getApiServerUrl(defaultApiServerPort)
if err := waitTapPodsReady(apiServerUrl); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
logsCmdArgs := getDefaultLogsCommandArgs()
logsCmd := exec.Command(cliPath, logsCmdArgs...)
t.Logf("running command: %v", logsCmd.String())
if err := logsCmd.Start(); err != nil {
t.Errorf("failed to start logs command, err: %v", err)
return
}
if err := logsCmd.Wait(); err != nil {
t.Errorf("failed to wait logs command, err: %v", err)
return
}
logsPath, logsPathErr := getLogsPath()
if logsPathErr != nil {
t.Errorf("failed to get logs path, err: %v", logsPathErr)
return
}
zipReader, zipError := zip.OpenReader(logsPath)
if zipError != nil {
t.Errorf("failed to get zip reader, err: %v", zipError)
return
}
t.Cleanup(func() {
if err := zipReader.Close(); err != nil {
t.Logf("failed to close zip reader, err: %v", err)
}
})
var logsFileNames []string
for _, file := range zipReader.File {
logsFileNames = append(logsFileNames, file.Name)
}
if !Contains(logsFileNames, "mizu.mizu-api-server.log") {
t.Errorf("api server logs not found")
return
}
if !Contains(logsFileNames, "mizu_cli.log") {
t.Errorf("cli logs not found")
return
}
if !Contains(logsFileNames, "mizu_events.log") {
t.Errorf("events logs not found")
return
}
if !ContainsPartOfValue(logsFileNames, "mizu.mizu-tapper-daemon-set") {
t.Errorf("tapper logs not found")
return
}
}
func TestLogsPath(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := getDefaultTapCommandArgs()
tapNamespace := getDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
t.Cleanup(func() {
if err := cleanupCommand(tapCmd); err != nil {
t.Logf("failed to cleanup tap command, err: %v", err)
}
})
if err := tapCmd.Start(); err != nil {
t.Errorf("failed to start tap command, err: %v", err)
return
}
apiServerUrl := getApiServerUrl(defaultApiServerPort)
if err := waitTapPodsReady(apiServerUrl); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
logsCmdArgs := getDefaultLogsCommandArgs()
logsPath := "../logs.zip"
logsCmdArgs = append(logsCmdArgs, "-f", logsPath)
logsCmd := exec.Command(cliPath, logsCmdArgs...)
t.Logf("running command: %v", logsCmd.String())
if err := logsCmd.Start(); err != nil {
t.Errorf("failed to start logs command, err: %v", err)
return
}
if err := logsCmd.Wait(); err != nil {
t.Errorf("failed to wait logs command, err: %v", err)
return
}
zipReader, zipError := zip.OpenReader(logsPath)
if zipError != nil {
t.Errorf("failed to get zip reader, err: %v", zipError)
return
}
t.Cleanup(func() {
if err := zipReader.Close(); err != nil {
t.Logf("failed to close zip reader, err: %v", err)
}
})
var logsFileNames []string
for _, file := range zipReader.File {
logsFileNames = append(logsFileNames, file.Name)
}
if !Contains(logsFileNames, "mizu.mizu-api-server.log") {
t.Errorf("api server logs not found")
return
}
if !Contains(logsFileNames, "mizu_cli.log") {
t.Errorf("cli logs not found")
return
}
if !Contains(logsFileNames, "mizu_events.log") {
t.Errorf("events logs not found")
return
}
if !ContainsPartOfValue(logsFileNames, "mizu.mizu-tapper-daemon-set") {
t.Errorf("tapper logs not found")
return
}
}

View File

@@ -1,11 +1,14 @@
package acceptanceTests
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"path"
"strings"
"testing"
"time"
@@ -502,10 +505,19 @@ func TestTapRedact(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
headers := entryRequest["headers"].([]interface{})
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
headers := entryDetails["headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
@@ -518,8 +530,8 @@ func TestTapRedact(t *testing.T) {
}
}
data := entryRequest["postData"].(map[string]interface{})
textDataStr := data["text"].(string)
postData := entryDetails["postData"].(map[string]interface{})
textDataStr := postData["text"].(string)
var textData map[string]string
if parseErr := json.Unmarshal([]byte(textDataStr), &textData); parseErr != nil {
@@ -608,10 +620,19 @@ func TestTapNoRedact(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
headers := entryRequest["headers"].([]interface{})
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
headers := entryDetails["headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
@@ -624,8 +645,8 @@ func TestTapNoRedact(t *testing.T) {
}
}
data := entryRequest["postData"].(map[string]interface{})
textDataStr := data["text"].(string)
postData := entryDetails["postData"].(map[string]interface{})
textDataStr := postData["text"].(string)
var textData map[string]string
if parseErr := json.Unmarshal([]byte(textDataStr), &textData); parseErr != nil {
@@ -714,11 +735,20 @@ func TestTapRegexMasking(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
data := entryRequest["postData"].(map[string]interface{})
textData := data["text"].(string)
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
postData := entryDetails["postData"].(map[string]interface{})
textData := postData["text"].(string)
if textData != "[REDACTED]" {
return fmt.Errorf("unexpected result - body is not redacted")
@@ -731,3 +761,105 @@ func TestTapRegexMasking(t *testing.T) {
return
}
}
func TestTapDumpLogs(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := getDefaultTapCommandArgs()
tapNamespace := getDefaultTapNamespace()
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
tapCmdArgs = append(tapCmdArgs, "--set", "dump-logs=true")
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
if err := tapCmd.Start(); err != nil {
t.Errorf("failed to start tap command, err: %v", err)
return
}
apiServerUrl := getApiServerUrl(defaultApiServerPort)
if err := waitTapPodsReady(apiServerUrl); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
if err := cleanupCommand(tapCmd); err != nil {
t.Errorf("failed to cleanup tap command, err: %v", err)
return
}
mizuFolderPath, mizuPathErr := getMizuFolderPath()
if mizuPathErr != nil {
t.Errorf("failed to get mizu folder path, err: %v", mizuPathErr)
return
}
files, readErr := ioutil.ReadDir(mizuFolderPath)
if readErr != nil {
t.Errorf("failed to read mizu folder files, err: %v", readErr)
return
}
var dumpsLogsPath string
for _, file := range files {
fileName := file.Name()
if strings.Contains(fileName, "mizu_logs") {
dumpsLogsPath = path.Join(mizuFolderPath, fileName)
break
}
}
if dumpsLogsPath == "" {
t.Errorf("dump logs file not found")
return
}
zipReader, zipError := zip.OpenReader(dumpsLogsPath)
if zipError != nil {
t.Errorf("failed to get zip reader, err: %v", zipError)
return
}
t.Cleanup(func() {
if err := zipReader.Close(); err != nil {
t.Logf("failed to close zip reader, err: %v", err)
}
})
var logsFileNames []string
for _, file := range zipReader.File {
logsFileNames = append(logsFileNames, file.Name)
}
if !Contains(logsFileNames, "mizu.mizu-api-server.log") {
t.Errorf("api server logs not found")
return
}
if !Contains(logsFileNames, "mizu_cli.log") {
t.Errorf("cli logs not found")
return
}
if !Contains(logsFileNames, "mizu_events.log") {
t.Errorf("events logs not found")
return
}
if !ContainsPartOfValue(logsFileNames, "mizu.mizu-tapper-daemon-set") {
t.Errorf("tapper logs not found")
return
}
}

View File

@@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"
)
@@ -32,13 +33,22 @@ func getCliPath() (string, error) {
return cliPath, nil
}
func getConfigPath() (string, error) {
func getMizuFolderPath() (string, error) {
home, homeDirErr := os.UserHomeDir()
if homeDirErr != nil {
return "", homeDirErr
}
return path.Join(home, ".mizu", "config.yaml"), nil
return path.Join(home, ".mizu"), nil
}
func getConfigPath() (string, error) {
mizuFolderPath, mizuPathError := getMizuFolderPath()
if mizuPathError != nil {
return "", mizuPathError
}
return path.Join(mizuFolderPath, "config.yaml"), nil
}
func getProxyUrl(namespace string, service string) string {
@@ -72,6 +82,13 @@ func getDefaultTapCommandArgsWithRegex(regex string) []string {
return append([]string{tapCommand, regex}, defaultCmdArgs...)
}
func getDefaultLogsCommandArgs() []string {
logsCommand := "logs"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{logsCommand}, defaultCmdArgs...)
}
func getDefaultTapNamespace() []string {
return []string{"-n", "mizu-tests"}
}
@@ -193,3 +210,33 @@ func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
return pods, nil
}
func getLogsPath() (string, error) {
dir, filePathErr := os.Getwd()
if filePathErr != nil {
return "", filePathErr
}
logsPath := path.Join(dir, "mizu_logs.zip")
return logsPath, nil
}
func Contains(slice []string, containsValue string) bool {
for _, sliceValue := range slice {
if sliceValue == containsValue {
return true
}
}
return false
}
func ContainsPartOfValue(slice []string, containsValue string) bool {
for _, sliceValue := range slice {
if strings.Contains(sliceValue, containsValue) {
return true
}
}
return false
}

View File

@@ -50,19 +50,22 @@ func main() {
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
}
filteringOptions := getTrafficFilteringOptions()
if *standaloneMode {
api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions)
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
// go api.StartReadingOutbound(outboundLinkOutputChannel)
hostApi(nil)
} else if *tapperMode {
rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
if *apiServerAddress == "" {
panic("API server address must be provided with --api-server-address when using --tap")
}
@@ -73,23 +76,22 @@ func main() {
rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
}
// harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions)
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
// go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel)
} else if *apiServerMode {
api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(outputItemsChannel)
@@ -97,7 +99,7 @@ func main() {
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredHarChannel, getTrafficFilteringOptions())
go filterItems(outputItemsChannel, filteredHarChannel, filteringOptions)
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil)
}
@@ -121,7 +123,7 @@ func loadExtensions() {
extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files {
filename := file.Name()
log.Printf("Loading extension: %s\n", filename)
rlog.Infof("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
@@ -225,23 +227,23 @@ func getTapTargets() []string {
return tappedAddressesPerNodeDict[nodeName]
}
func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" {
return &shared.TrafficFilteringOptions{
return &tapApi.TrafficFilteringOptions{
HealthChecksUserAgentHeaders: []string{},
}
}
var filteringOptions shared.TrafficFilteringOptions
var filteringOptions tapApi.TrafficFilteringOptions
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the api.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
}
return &filteringOptions
}
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *tapApi.TrafficFilteringOptions) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
@@ -251,10 +253,6 @@ func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *ta
continue
}
// if !filterOptions.DisableRedaction {
// sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
// }
outChannel <- message
}
}
@@ -293,7 +291,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
for messageData := range messageDataChannel {
marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
if err != nil {
rlog.Infof("error converting message to json %s, (%v,%+v)\n", err, err, err)
rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
@@ -301,26 +299,8 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
// and goes into the intermediate WebSocket.
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
}
}
func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) {
for outboundLink := range outboundLinkChannel {
if outboundLink.SuggestedProtocol == tap.TLSProtocol {
marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink)
if err != nil {
rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err)
continue
}
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err)
continue
}
}
}
}

View File

@@ -116,6 +116,17 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
baseEntry := extension.Dissector.Summarize(mizuEntry)
mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
database.CreateEntry(mizuEntry)
if extension.Protocol.Name == "http" {
var pair tapApi.RequestResponsePair
json.Unmarshal([]byte(mizuEntry.Entry), &pair)
harEntry, err := utils.NewEntry(&pair)
if err == nil {
rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
baseEntry.Rules = rules
baseEntry.Latency = mizuEntry.ElapsedTime
}
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry)
BroadcastToBrowserClients(baseEntryBytes)
}

View File

@@ -3,7 +3,6 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/google/martian/har"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
@@ -13,6 +12,8 @@ import (
"net/http"
"time"
"github.com/google/martian/har"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
@@ -140,11 +141,23 @@ func GetEntry(c *gin.Context) {
extension := extensionsMap[entryData.ProtocolName]
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
var rules []map[string]interface{}
if entryData.ProtocolName == "http" {
var pair tapApi.RequestResponsePair
json.Unmarshal([]byte(entryData.Entry), &pair)
harEntry, _ := utils.NewEntry(&pair)
_, rulesMatched := models.RunValidationRulesState(*harEntry, entryData.Service)
inrec, _ := json.Marshal(rulesMatched)
json.Unmarshal(inrec, &rules)
}
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entryData,
Rules: rules,
})
}

View File

@@ -2,9 +2,10 @@ package models
import (
"encoding/json"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/rules"
"mizuserver/pkg/utils"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
@@ -15,15 +16,6 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
return v.UnmarshalData(r)
}
// TODO: until we fixed the Rules feature
//func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
// ar := tapApi.ApplicableRules{}
// ar.Status = status
// ar.Latency = latency
// ar.NumberOfRules = number
// return ar
//}
type EntriesFilter struct {
Limit int `form:"limit" validate:"required,min=1,max=200"`
Operator string `form:"operator" validate:"required,oneof='lt' 'gt'"`
@@ -105,33 +97,8 @@ type ExtendedCreator struct {
Source *string `json:"_source"`
}
type FullEntryWithPolicy struct {
RulesMatched []rules.RulesMatched `json:"rulesMatched,omitempty"`
Entry har.Entry `json:"entry"`
Service string `json:"service"`
func RunValidationRulesState(harEntry har.Entry, service string) (tapApi.ApplicableRules, []rules.RulesMatched) {
resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend)
return tapApi.ApplicableRules{Status: statusPolicyToSend, Latency: latency, NumberOfRules: numberOfRules}, resultPolicyToSend
}
func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(entry.Entry), &pair); err != nil {
return err
}
harEntry, err := utils.NewEntry(&pair)
if err != nil {
return err
}
fewp.Entry = *harEntry
_, resultPolicyToSend := rules.MatchRequestPolicy(fewp.Entry, entry.Service)
fewp.RulesMatched = resultPolicyToSend
fewp.Service = entry.Service
return nil
}
// TODO: until we fixed the Rules feature
//func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
// numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
// statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
// ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
// return ar
//}

View File

@@ -1,8 +1,10 @@
package rules
import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/romana/rlog"
"reflect"
"regexp"
"strings"
@@ -41,7 +43,7 @@ func ValidateService(serviceFromRule string, service string) bool {
return true
}
func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched) {
func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
enforcePolicy, _ := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
var resultPolicyToSend []RulesMatched
for _, rule := range enforcePolicy.Rules {
@@ -50,7 +52,8 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched
}
if rule.Type == "json" {
var bodyJsonMap interface{}
if err := json.Unmarshal(harEntry.Response.Content.Text, &bodyJsonMap); err != nil {
contentTextDecoded, _ := base64.StdEncoding.DecodeString(string(harEntry.Response.Content.Text))
if err := json.Unmarshal(contentTextDecoded, &bodyJsonMap); err != nil {
continue
}
out, err := jsonpath.Read(bodyJsonMap, rule.Key)
@@ -63,6 +66,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched
if err != nil {
continue
}
rlog.Info(matchValue, rule.Value)
} else {
val := fmt.Sprint(out)
matchValue, err = regexp.MatchString(rule.Value, val)
@@ -89,22 +93,28 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (int, []RulesMatched
resultPolicyToSend = appendRulesMatched(resultPolicyToSend, true, rule)
}
}
return len(enforcePolicy.Rules), resultPolicyToSend
return resultPolicyToSend
}
func PassedValidationRules(rulesMatched []RulesMatched, numberOfRules int) (bool, int64, int) {
if len(rulesMatched) == 0 {
return false, 0, 0
func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
var numberOfRulesMatched = len(rulesMatched)
var latency int64 = -1
if numberOfRulesMatched == 0 {
return false, 0, numberOfRulesMatched
}
for _, rule := range rulesMatched {
if rule.Matched == false {
return false, -1, len(rulesMatched)
return false, latency, numberOfRulesMatched
} else {
if strings.ToLower(rule.Rule.Type) == "latency" {
if rule.Rule.Latency < latency || latency == -1 {
latency = rule.Rule.Latency
}
}
}
}
for _, rule := range rulesMatched {
if strings.ToLower(rule.Rule.Type) == "latency" {
return true, rule.Rule.Latency, len(rulesMatched)
}
}
return true, -1, len(rulesMatched)
return true, latency, numberOfRulesMatched
}

View File

@@ -4,16 +4,14 @@ import (
"bytes"
"errors"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"github.com/up9inc/mizu/tap/api"
)
// Keep it because we might want cookies in the future
//func BuildCookies(rawCookies []interface{}) []har.Cookie {
// cookies := make([]har.Cookie, 0, len(rawCookies))
@@ -82,7 +80,7 @@ func BuildHeaders(rawHeaders []interface{}) ([]har.Header, string, string, strin
path = h["value"].(string)
}
if h["name"] == ":status" {
path = h["value"].(string)
status = h["value"].(string)
}
}
@@ -205,7 +203,7 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
if strings.HasPrefix(mimeType.(string), "application/grpc") {
status, err = strconv.Atoi(_status)
if err != nil {
tap.SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
rlog.Errorf("Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response status to int for HAR")
}
}
@@ -226,14 +224,13 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
harRequest, err := NewRequest(&pair.Request)
if err != nil {
tap.SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
rlog.Errorf("Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting request to HAR")
}
harResponse, err := NewResponse(&pair.Response)
if err != nil {
fmt.Printf("err: %+v\n", err)
tap.SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
rlog.Errorf("Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response to HAR")
}

View File

@@ -1,8 +1,8 @@
package shared
package utils
import (
"fmt"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"time"
)
@@ -11,24 +11,23 @@ const (
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
)
func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) {
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < retries {
for try < DEFAULT_SOCKET_RETRIES {
rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
try++
if !hideTimeoutErrors {
fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
}
} else {
break
}
time.Sleep(retrySleepTime)
time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
}
if err != nil {

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
"log"
"net/http"
"net/url"
"os"
@@ -18,8 +17,8 @@ import (
func StartServer(app *gin.Engine) {
signals := make(chan os.Signal, 2)
signal.Notify(signals,
os.Interrupt, // this catch ctrl + c
syscall.SIGTSTP, // this catch ctrl + z
os.Interrupt, // this catch ctrl + c
syscall.SIGTSTP, // this catch ctrl + z
)
srv := &http.Server{
@@ -36,8 +35,9 @@ func StartServer(app *gin.Engine) {
}()
// Run server.
rlog.Infof("Starting the server...")
if err := app.Run(":8899"); err != nil {
log.Printf("Oops... Server is not running! Reason: %v", err)
rlog.Errorf("Server is not running! Reason: %v", err)
}
}
@@ -54,15 +54,15 @@ func ReverseSlice(data interface{}) {
func CheckErr(e error) {
if e != nil {
log.Printf("%v", e)
rlog.Infof("%v", e)
//panic(e)
}
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil{
log.Printf("error replacing hostname to %s in address %s, returning original %v",newHostname, address, err)
if err != nil {
rlog.Infof("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
return address
}
replacedUrl.Host = newHostname

View File

@@ -2,11 +2,13 @@ package cmd
import (
"errors"
"fmt"
"os"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"os"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
@@ -66,4 +68,7 @@ func init() {
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
tapCmd.Flags().String(configStructs.EnforcePolicyFileDeprecated, defaultTapConfig.EnforcePolicyFileDeprecated, "Yaml file with policy rules")
tapCmd.Flags().MarkDeprecated(configStructs.EnforcePolicyFileDeprecated, fmt.Sprintf("Use --%s instead", configStructs.EnforcePolicyFile))
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/tap/api"
yaml "gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
@@ -46,14 +47,23 @@ func RunMizuTap() {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err)))
return
}
var mizuValidationRules string
if config.Config.Tap.EnforcePolicyFile != "" {
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
if config.Config.Tap.EnforcePolicyFile != "" || config.Config.Tap.EnforcePolicyFileDeprecated != "" {
var trafficValidation string
if config.Config.Tap.EnforcePolicyFile != "" {
trafficValidation = config.Config.Tap.EnforcePolicyFile
} else {
trafficValidation = config.Config.Tap.EnforcePolicyFileDeprecated
}
mizuValidationRules, err = readValidationRules(trafficValidation)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
return
}
}
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
if err != nil {
logger.Log.Error(err)
@@ -108,7 +118,7 @@ func RunMizuTap() {
}
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel)
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions)
//block until exit signal or error
waitForFinish(ctx, cancel)
@@ -123,7 +133,7 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
@@ -134,7 +144,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
return err
}
@@ -158,7 +168,7 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
var err error
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
@@ -199,13 +209,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
return nil
}
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
var compiledRegexSlice []*shared.SerializableRegexp
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
var compiledRegexSlice []*api.SerializableRegexp
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
compiledRegexSlice = make([]*api.SerializableRegexp, 0)
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
compiledRegex, err := api.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
return nil, err
}
@@ -213,14 +223,14 @@ func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
}
}
return &shared.TrafficFilteringOptions{
return &api.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: config.Config.Tap.DisableRedaction,
}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string) error {
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if state.mizuServiceAccountExists {
@@ -240,6 +250,7 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
serviceAccountName,
config.Config.Tap.TapperResources,
config.Config.ImagePullPolicy(),
mizuApiFilteringOptions,
); err != nil {
return err
}
@@ -346,7 +357,7 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, config.Config.Tap.PodRegex())
restartTappers := func() {
@@ -370,7 +381,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
cancel()
}
@@ -379,13 +390,28 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
for {
select {
case pod := <-added:
case pod, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-removed:
case pod, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-modified:
case pod, ok := <-modified:
if !ok {
modified = nil
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@@ -396,8 +422,12 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
case err := <-errorChan:
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
@@ -477,21 +507,28 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
timeAfter := time.After(25 * time.Second)
for {
select {
case <-ctx.Done():
logger.Log.Debugf("Watching API Server pod loop, ctx done")
return
case <-added:
case _, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, added")
continue
case <-removed:
case _, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Infof("%s removed", mizu.ApiServerPodName)
cancel()
return
case modifiedPod := <-modified:
if modifiedPod == nil {
logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
case modifiedPod, ok := <-modified:
if !ok {
modified = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
@@ -510,14 +547,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case _, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
cancel()
case <-timeAfter:
if !isPodReady {
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
cancel()
}
case <-errorChan:
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
cancel()
case <-ctx.Done():
logger.Log.Debugf("Watching API Server pod loop, ctx done")
return
}
}
}

View File

@@ -16,7 +16,8 @@ const (
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DryRunTapName = "dry-run"
EnforcePolicyFile = "test-rules"
EnforcePolicyFile = "traffic-validation"
EnforcePolicyFileDeprecated = "test-rules"
)
type TapConfig struct {
@@ -32,7 +33,8 @@ type TapConfig struct {
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"test-rules"`
EnforcePolicyFile string `yaml:"traffic-validation"`
EnforcePolicyFileDeprecated string `yaml:"test-rules"`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
}

View File

@@ -11,6 +11,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2
@@ -19,3 +20,5 @@ require (
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -18,6 +18,7 @@ import (
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -150,7 +151,7 @@ type ApiServerOptions struct {
PodImage string
ServiceAccountName string
IsNamespaceRestricted bool
MizuApiFilteringOptions *shared.TrafficFilteringOptions
MizuApiFilteringOptions *api.TrafficFilteringOptions
MaxEntriesDBSizeBytes int64
Resources configStructs.Resources
ImagePullPolicy core.PullPolicy
@@ -575,7 +576,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
return nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources configStructs.Resources, imagePullPolicy core.PullPolicy) error {
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources configStructs.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
@@ -587,6 +588,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions)
if err != nil {
return err
}
mizuCmd := []string{
"./mizuagent",
"-i", "any",
@@ -605,6 +611,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
applyconfcore.EnvVar().WithName(shared.MizuFilteringOptionsEnvVar).WithValue(string(marshaledFilteringOptions)),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(

View File

@@ -1,42 +0,0 @@
package mizu
import (
"encoding/json"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
"time"
)
type ControlSocket struct {
connection *websocket.Conn
}
func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true)
if err != nil {
return nil, err
} else {
return &ControlSocket{connection: connection}, nil
}
}
func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(pods []core.Pod) error {
podInfos := make([]shared.PodInfo, 0)
for _, pod := range pods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
}
tapStatus := shared.TapStatus{Pods: podInfos}
socketMessage := shared.CreateWebSocketStatusMessage(tapStatus)
jsonMessage, err := json.Marshal(socketMessage)
if err != nil {
return err
}
err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage)
if err != nil {
return err
}
return nil
}

View File

@@ -1,4 +1,4 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Contributing to Mizu

View File

@@ -1,4 +1,4 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Kubernetes permissions for MIZU
This document describes in details all permissions required for full and correct operation of Mizu

View File

@@ -24,14 +24,14 @@ To use this feature - create simple rules file (see details below) and pass this
```shell
mizu tap --test-rules rules.yaml PODNAME
mizu tap --traffic-validation rules.yaml PODNAME
```
## Rules file structure
The structure of the test-rules-file is:
The structure of the traffic-validation-file is:
* `name`: string, name of the rule
* `type`: string, type of the rule, must be `json` or `header` or `latency`

33
docs/TESTING.md Normal file
View File

@@ -0,0 +1,33 @@
![Mizu: The API Traffic Viewer for Kubernetes](../assets/mizu-logo.svg)
# Testing guidelines
## Generic guidelines
* Use "[testing](https://pkg.go.dev/testing)" package
* Write [Table-driven tests using subtests](https://go.dev/blog/subtests)
* Use cleanup in test/subtest in order to clean up resources
* Name the test func "Test<tested_func_name><tested_case>"
## Unit tests
* Position the test file inside the folder of the tested package
* In case of internal func testing
* Name the test file "<tested_file_name>_internal_test.go"
* Name the test package same as the package being tested
* Example - [Config](../cli/config/config_internal_test.go)
* In case of exported func testing
* Name the test file "<tested_file_name>_test.go"
* Name the test package "<tested_package>_test"
* Example - [Slice Utils](../cli/mizu/sliceUtils_test.go)
* Make sure to run test coverage to make sure you covered all the cases and lines in the func
## Acceptance tests
* Position the test file inside the [acceptance tests folder](../acceptanceTests)
* Name the file "<tested_command>_test.go"
* Name the package "acceptanceTests"
* Do not run as part of the short tests
* Use/Create generic test utils func in acceptanceTests/testsUtils
* Don't use sleep inside the tests - active check
* Running acceptance tests locally
* Switch to the branch that is being tested
* Run acceptanceTests/setup.sh
* Run tests (make acceptance-test)
* Example - [Tap](../acceptanceTests/tap_test.go)

View File

@@ -74,12 +74,6 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
}
}
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}
type VersionResponse struct {
SemVer string `json:"semver"`
}
@@ -99,6 +93,11 @@ type RulePolicy struct {
Name string `yaml:"name"`
}
type RulesMatched struct {
Matched bool `json:"matched"`
Rule RulePolicy `json:"rule"`
}
func (r *RulePolicy) validateType() bool {
permitedTypes := []string{"json", "header", "latency"}
_, found := Find(permitedTypes, r.Type)

View File

@@ -80,13 +80,14 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
}
type Emitting struct {
AppStats *AppStats
OutputChannel chan *OutputChannelItem
}
@@ -96,39 +97,47 @@ type Emitter interface {
func (e *Emitting) Emit(item *OutputChannelItem) {
e.OutputChannel <- item
e.AppStats.IncMatchedPairs()
}
type MizuEntry struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
ProtocolName string `json:"protocolKey" gorm:"column:protocolKey"`
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
ProtocolName string `json:"protocolName" gorm:"column:protocolName"`
ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"`
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"`
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"`
ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"`
ProtocolFontSize int8 `json:"protocolFontSize" gorm:"column:protocolFontSize"`
ProtocolReferenceLink string `json:"protocolReferenceLink" gorm:"column:protocolReferenceLink"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
}
type MizuEntryWrapper struct {
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
}
type BaseEntryDetails struct {
@@ -162,11 +171,19 @@ type DataUnmarshaler interface {
}
func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
entryUrl := entry.Url
service := entry.Service
bed.Protocol = Protocol{
Name: entry.ProtocolName,
LongName: entry.ProtocolLongName,
Abbreviation: entry.ProtocolAbbreviation,
Version: entry.ProtocolVersion,
BackgroundColor: entry.ProtocolBackgroundColor,
ForegroundColor: entry.ProtocolForegroundColor,
FontSize: entry.ProtocolFontSize,
ReferenceLink: entry.ProtocolReferenceLink,
}
bed.Id = entry.EntryId
bed.Url = entryUrl
bed.Service = service
bed.Url = entry.Url
bed.Service = entry.Service
bed.Summary = entry.Path
bed.StatusCode = entry.Status
bed.Method = entry.Method

7
tap/api/options.go Normal file
View File

@@ -0,0 +1,7 @@
package api
type TrafficFilteringOptions struct {
HealthChecksUserAgentHeaders []string
PlainTextMaskingRegexes []*SerializableRegexp
DisableRedaction bool
}

View File

@@ -1,4 +1,4 @@
package shared
package api
import "regexp"

70
tap/api/stats_tracker.go Normal file
View File

@@ -0,0 +1,70 @@
package api
import (
"sync/atomic"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes uint64 `json:"processedBytes"`
PacketsCount uint64 `json:"packetsCount"`
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
MatchedPairs uint64 `json:"matchedPairs"`
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
}
func (as *AppStats) IncMatchedPairs() {
atomic.AddUint64(&as.MatchedPairs, 1)
}
func (as *AppStats) IncDroppedTcpStreams() {
atomic.AddUint64(&as.DroppedTcpStreams, 1)
}
func (as *AppStats) IncPacketsCount() uint64 {
atomic.AddUint64(&as.PacketsCount, 1)
return as.PacketsCount
}
func (as *AppStats) IncTcpPacketsCount() {
atomic.AddUint64(&as.TcpPacketsCount, 1)
}
func (as *AppStats) IncReassembledTcpPayloadsCount() {
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
}
func (as *AppStats) IncTlsConnectionsCount() {
atomic.AddUint64(&as.TlsConnectionsCount, 1)
}
func (as *AppStats) UpdateProcessedBytes(size uint64) {
atomic.AddUint64(&as.ProcessedBytes, size)
}
func (as *AppStats) SetStartTime(startTime time.Time) {
as.StartTime = startTime
}
func (as *AppStats) DumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: as.StartTime}
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
return currentAppStats
}
func resetUint64(ref *uint64) (val uint64) {
val = atomic.LoadUint64(ref)
atomic.StoreUint64(ref, 0)
return
}

View File

@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b}
var remaining int
@@ -267,25 +267,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
request["url"] = summary
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolVersion: protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: request["method"].(string),
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: request["method"].(string),
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}

View File

@@ -3,6 +3,7 @@ module github.com/up9inc/mizu/tap/extensions/http
go 1.16
require (
github.com/beevik/etree v1.1.0 // indirect
github.com/google/martian v2.1.0+incompatible
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -1,3 +1,5 @@
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI=

View File

@@ -13,7 +13,14 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
if !options.DisableRedaction {
FilterSensitiveData(item, options)
}
emitter.Emit(item)
}
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil {
return err
@@ -64,13 +71,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime
if item != nil {
item.Protocol = http2Protocol
emitter.Emit(item)
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
req, err := http.ReadRequest(b)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -107,12 +114,12 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
emitter.Emit(item)
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
res, err := http.ReadResponse(b, nil)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -157,7 +164,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
emitter.Emit(item)
filterAndEmit(item, emitter, options)
}
return nil
}

View File

@@ -61,7 +61,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -85,7 +85,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -94,7 +94,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -103,7 +103,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
dissected = true
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -172,25 +172,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolVersion: item.Protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: item.Protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}

View File

@@ -0,0 +1,209 @@
package main
import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"github.com/beevik/etree"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
)
const maskedFieldPlaceholderValue = "[REDACTED]"
//these values MUST be all lower case and contain no `-` or `_` characters
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
"username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt",
"bearer", "clientid", "clientsecret", "redirecturi", "phonenumber",
"zip", "zipcode", "address", "country", "firstname", "lastname",
"middlename", "fname", "lname", "birthdate"}
func FilterSensitiveData(item *api.OutputChannelItem, options *api.TrafficFilteringOptions) {
request := item.Pair.Request.Payload.(HTTPPayload).Data.(*http.Request)
response := item.Pair.Response.Payload.(HTTPPayload).Data.(*http.Response)
filterHeaders(&request.Header)
filterHeaders(&response.Header)
filterUrl(request.URL)
filterRequestBody(request, options)
filterResponseBody(response, options)
}
func filterRequestBody(request *http.Request, options *api.TrafficFilteringOptions) {
contenType := getContentTypeHeaderValue(request.Header)
body, err := ioutil.ReadAll(request.Body)
if err != nil {
rlog.Debugf("Filtering error reading body: %v", err)
return
}
filteredBody, err := filterHttpBody([]byte(body), contenType, options)
if err == nil {
request.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody))
} else {
request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
}
}
func filterResponseBody(response *http.Response, options *api.TrafficFilteringOptions) {
contentType := getContentTypeHeaderValue(response.Header)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
rlog.Debugf("Filtering error reading body: %v", err)
return
}
filteredBody, err := filterHttpBody([]byte(body), contentType, options)
if err == nil {
response.Body = ioutil.NopCloser(bytes.NewBuffer(filteredBody))
} else {
response.Body = ioutil.NopCloser(bytes.NewBuffer(body))
}
}
func filterHeaders(headers *http.Header) {
for key, _ := range *headers {
if strings.ToLower(key) == "cookie" {
headers.Del(key)
} else if isFieldNameSensitive(key) {
headers.Set(key, maskedFieldPlaceholderValue)
}
}
}
func getContentTypeHeaderValue(headers http.Header) string {
for key, _ := range headers {
if strings.ToLower(key) == "content-type" {
return headers.Get(key)
}
}
return ""
}
func isFieldNameSensitive(fieldName string) bool {
name := strings.ToLower(fieldName)
name = strings.ReplaceAll(name, "_", "")
name = strings.ReplaceAll(name, "-", "")
name = strings.ReplaceAll(name, " ", "")
for _, sensitiveField := range personallyIdentifiableDataFields {
if strings.Contains(name, sensitiveField) {
return true
}
}
return false
}
func filterHttpBody(bytes []byte, contentType string, options *api.TrafficFilteringOptions) ([]byte, error) {
mimeType := strings.Split(contentType, ";")[0]
switch strings.ToLower(mimeType) {
case "application/json":
return filterJsonBody(bytes)
case "text/html":
fallthrough
case "application/xhtml+xml":
fallthrough
case "text/xml":
fallthrough
case "application/xml":
return filterXmlEtree(bytes)
case "text/plain":
if options != nil && options.PlainTextMaskingRegexes != nil {
return filterPlainText(bytes, options), nil
}
}
return bytes, nil
}
func filterPlainText(bytes []byte, options *api.TrafficFilteringOptions) []byte {
for _, regex := range options.PlainTextMaskingRegexes {
bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue))
}
return bytes
}
func filterXmlEtree(bytes []byte) ([]byte, error) {
if !IsValidXML(bytes) {
return nil, errors.New("Invalid XML")
}
xmlDoc := etree.NewDocument()
err := xmlDoc.ReadFromBytes(bytes)
if err != nil {
return nil, err
} else {
filterXmlElement(xmlDoc.Root())
}
return xmlDoc.WriteToBytes()
}
func IsValidXML(data []byte) bool {
return xml.Unmarshal(data, new(interface{})) == nil
}
func filterXmlElement(element *etree.Element) {
for i, attribute := range element.Attr {
if isFieldNameSensitive(attribute.Key) {
element.Attr[i].Value = maskedFieldPlaceholderValue
}
}
if element.ChildElements() == nil || len(element.ChildElements()) == 0 {
if isFieldNameSensitive(element.Tag) {
element.SetText(maskedFieldPlaceholderValue)
}
} else {
for _, element := range element.ChildElements() {
filterXmlElement(element)
}
}
}
func filterJsonBody(bytes []byte) ([]byte, error) {
var bodyJsonMap map[string]interface{}
err := json.Unmarshal(bytes, &bodyJsonMap)
if err != nil {
return nil, err
}
filterJsonMap(bodyJsonMap)
return json.Marshal(bodyJsonMap)
}
func filterJsonMap(jsonMap map[string]interface{}) {
for key, value := range jsonMap {
// Do not replace nil values with maskedFieldPlaceholderValue
if value == nil {
continue
}
nestedMap, isNested := value.(map[string]interface{})
if isNested {
filterJsonMap(nestedMap)
} else {
if isFieldNameSensitive(key) {
jsonMap[key] = maskedFieldPlaceholderValue
}
}
}
}
func filterUrl(url *url.URL) {
if len(url.RawQuery) > 0 {
newQueryArgs := make([]string, 0)
for urlQueryParamName, urlQueryParamValues := range url.Query() {
newValues := urlQueryParamValues
if isFieldNameSensitive(urlQueryParamName) {
newValues = []string{maskedFieldPlaceholderValue}
}
for _, paramValue := range newValues {
newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue))
}
}
url.RawQuery = strings.Join(newQueryArgs, "&")
}
}

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
@@ -142,25 +142,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
ProtocolVersion: _protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: _protocol.Name,
ProtocolLongName: _protocol.LongName,
ProtocolAbbreviation: _protocol.Abbreviation,
ProtocolVersion: _protocol.Version,
ProtocolBackgroundColor: _protocol.BackgroundColor,
ProtocolForegroundColor: _protocol.ForegroundColor,
ProtocolFontSize: _protocol.FontSize,
ProtocolReferenceLink: _protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}

View File

@@ -18,6 +18,7 @@ import (
"os"
"os/signal"
"runtime"
_debug "runtime/debug"
"runtime/pprof"
"strconv"
"strings"
@@ -63,7 +64,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile")
var statsTracker = StatsTracker{}
var appStats = api.AppStats{}
// global
var stats struct {
@@ -92,9 +93,10 @@ var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global
const baseStreamChannelTimeoutMs int = 5000 * 100
@@ -152,17 +154,18 @@ type Context struct {
CaptureInfo gopacket.CaptureInfo
}
func GetStats() AppStats {
return statsTracker.appStats
func GetStats() api.AppStats {
return appStats
}
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension) {
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
hostMode = opts.HostMode
extensions = extensionsRef
filteringOptions = options
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
@@ -215,18 +218,18 @@ func startMemoryProfiler() {
}
func closeTimedoutTcpStreamChannels() {
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
_debug.FreeOSMemory()
streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
@@ -328,10 +331,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
source.Lazy = *lazy
source.NoCopy = true
rlog.Info("Starting to read packets")
statsTracker.setStartTime(time.Now())
appStats.SetStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
var emitter api.Emitter = &api.Emitting{
AppStats: &appStats,
OutputChannel: outputItems,
}
@@ -374,7 +378,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(statsTracker.appStats.StartTime),
time.Since(appStats.StartTime),
nErrors,
errorMapLen,
errorsSummery,
@@ -397,7 +401,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := statsTracker.dumpStats()
currentAppStats := appStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
log.Printf("app stats - %v", string(appStatsJSON))
}
@@ -415,10 +419,10 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
rlog.Debugf("Error:", err)
continue
}
packetsCount := statsTracker.incPacketsCount()
packetsCount := appStats.IncPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
statsTracker.updateProcessedBytes(int64(len(data)))
appStats.UpdateProcessedBytes(uint64(len(data)))
if *hexdumppkt {
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
@@ -452,7 +456,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
statsTracker.incTcpPacketsCount()
appStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
@@ -470,15 +474,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
assemblerMutex.Unlock()
}
done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
statsTracker.appStats.PacketsCount,
statsTracker.appStats.ProcessedBytes,
time.Since(statsTracker.appStats.StartTime),
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
nErrors,
errorMapLen)
}

View File

@@ -13,11 +13,9 @@ const (
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
TcpStreamChannelTimeoutMsDefaultValue = 5000
MaxNumberOfGoroutinesDefaultValue = 4000
TcpStreamChannelTimeoutMsDefaultValue = 10000
)
type globalSettings struct {
@@ -62,14 +60,6 @@ func GetTcpChannelTimeoutMs() time.Duration {
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetMaxNumberOfGoroutines() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
if err != nil {
return MaxNumberOfGoroutinesDefaultValue
}
return valueFromEnv
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}

View File

@@ -1,117 +0,0 @@
package tap
import (
"sync"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes int64 `json:"processedBytes"`
PacketsCount int64 `json:"packetsCount"`
TcpPacketsCount int64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
}
type StatsTracker struct {
appStats AppStats
processedBytesMutex sync.Mutex
packetsCountMutex sync.Mutex
tcpPacketsCountMutex sync.Mutex
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
droppedTcpStreamsMutex sync.Mutex
}
func (st *StatsTracker) incMatchedPairs() {
st.matchedPairsMutex.Lock()
st.appStats.MatchedPairs++
st.matchedPairsMutex.Unlock()
}
func (st *StatsTracker) incDroppedTcpStreams() {
st.droppedTcpStreamsMutex.Lock()
st.appStats.DroppedTcpStreams++
st.droppedTcpStreamsMutex.Unlock()
}
func (st *StatsTracker) incPacketsCount() int64 {
st.packetsCountMutex.Lock()
st.appStats.PacketsCount++
currentPacketsCount := st.appStats.PacketsCount
st.packetsCountMutex.Unlock()
return currentPacketsCount
}
func (st *StatsTracker) incTcpPacketsCount() {
st.tcpPacketsCountMutex.Lock()
st.appStats.TcpPacketsCount++
st.tcpPacketsCountMutex.Unlock()
}
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
st.reassembledTcpPayloadsCountMutex.Lock()
st.appStats.ReassembledTcpPayloadsCount++
st.reassembledTcpPayloadsCountMutex.Unlock()
}
func (st *StatsTracker) incTlsConnectionsCount() {
st.tlsConnectionsCountMutex.Lock()
st.appStats.TlsConnectionsCount++
st.tlsConnectionsCountMutex.Unlock()
}
func (st *StatsTracker) updateProcessedBytes(size int64) {
st.processedBytesMutex.Lock()
st.appStats.ProcessedBytes += size
st.processedBytesMutex.Unlock()
}
func (st *StatsTracker) setStartTime(startTime time.Time) {
st.appStats.StartTime = startTime
}
func (st *StatsTracker) dumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
st.processedBytesMutex.Lock()
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
st.appStats.ProcessedBytes = 0
st.processedBytesMutex.Unlock()
st.packetsCountMutex.Lock()
currentAppStats.PacketsCount = st.appStats.PacketsCount
st.appStats.PacketsCount = 0
st.packetsCountMutex.Unlock()
st.tcpPacketsCountMutex.Lock()
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
st.appStats.TcpPacketsCount = 0
st.tcpPacketsCountMutex.Unlock()
st.reassembledTcpPayloadsCountMutex.Lock()
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
st.appStats.ReassembledTcpPayloadsCount = 0
st.reassembledTcpPayloadsCountMutex.Unlock()
st.tlsConnectionsCountMutex.Lock()
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
st.appStats.TlsConnectionsCount = 0
st.tlsConnectionsCountMutex.Unlock()
st.matchedPairsMutex.Lock()
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
st.droppedTcpStreamsMutex.Lock()
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
st.appStats.DroppedTcpStreams = 0
st.droppedTcpStreamsMutex.Unlock()
return currentAppStats
}

View File

@@ -107,7 +107,7 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
if err != nil {
io.Copy(ioutil.Discard, b)
}

View File

@@ -147,7 +147,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if length > 0 {
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
appStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for i := range t.clients {

View File

@@ -2,7 +2,6 @@ package tap
import (
"fmt"
"runtime"
"sync"
"time"
@@ -33,8 +32,6 @@ type tcpStreamWrapper struct {
var streams *sync.Map = &sync.Map{} // global
var streamId int64 = 0
var maxNumberOfGoroutines int
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
rlog.Debugf("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
@@ -61,11 +58,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
superIdentifier: &api.SuperIdentifier{},
}
if stream.isTapTarget {
if runtime.NumGoroutine() > maxNumberOfGoroutines {
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
return stream
}
streamId++
stream.id = streamId
for i, extension := range extensions {

22792
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -12,8 +12,8 @@
"@types/node": "^12.20.10",
"@types/react": "^17.0.3",
"@types/react-dom": "^17.0.3",
"jsonpath": "^1.1.1",
"axios": "^0.21.1",
"jsonpath": "^1.1.1",
"node-sass": "^5.0.0",
"numeral": "^2.0.6",
"protobuf-decoder": "^0.1.0",
@@ -21,7 +21,7 @@
"react-copy-to-clipboard": "^5.0.3",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
"react-scrollable-feed": "^1.3.0",
"react-scrollable-feed-virtualized": "^1.4.2",
"react-syntax-highlighter": "^15.4.3",
"typescript": "^4.2.4",
"web-vitals": "^1.1.1"

View File

@@ -2,7 +2,7 @@ import {EntryItem} from "./EntryListItem/EntryListItem";
import React, {useCallback, useEffect, useMemo, useRef, useState} from "react";
import styles from './style/EntriesList.module.sass';
import spinner from './assets/spinner.svg';
import ScrollableFeed from "react-scrollable-feed";
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
import {StatusType} from "./Filters";
import Api from "../helpers/api";
import down from "./assets/downImg.svg";
@@ -77,9 +77,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
}
setIsLoadingTop(false);
const newEntries = [...data, ...entries];
if(newEntries.length >= 1000) {
newEntries.splice(1000);
}
setEntries(newEntries);
if(scrollTo) {
@@ -100,10 +97,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
}
scrollTo = document.getElementById(filteredEntries?.[filteredEntries.length -1]?.id);
let newEntries = [...entries, ...data];
if(newEntries.length >= 1000) {
setNoMoreDataTop(false);
newEntries = newEntries.slice(-1000);
}
setEntries(newEntries);
if(scrollTo) {
scrollTo.scrollIntoView({behavior: "smooth"});
@@ -116,19 +109,20 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
{isLoadingTop && <div className={styles.spinnerContainer}>
<img alt="spinner" src={spinner} style={{height: 25}}/>
</div>}
<ScrollableFeed ref={scrollableRef} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
{noMoreDataTop && !connectionOpen && <div id="noMoreDataTop" className={styles.noMoreDataAvailable}>No more data available</div>}
{filteredEntries.map(entry => <EntryItem key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
isSelected={focusedEntryId === entry.id}/>)}
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
</div>}
</ScrollableFeed>
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.scrollToBottom()}>
isSelected={focusedEntryId === entry.id}
style={{}}/>)}
</ScrollableFeedVirtualized>
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
</div>}
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.jumpToBottom()}>
<img alt="down" src={down} />
</button>
</div>

View File

@@ -42,7 +42,6 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
</div>
</div>;
};
@@ -72,7 +71,7 @@ export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
/>
{entryData.data && <EntrySummary data={entryData.data}/>}
<>
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.backgroundColor}/>}
{entryData.data && <EntryViewer representation={entryData.representation} rulesMatched={entryData.rulesMatched} elapsedTime={entryData.data.elapsedTime} color={entryData.protocol.backgroundColor}/>}
</>
</>
};

View File

@@ -215,10 +215,10 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({serv
<>
{
rule.Key &&
<tr className={styles.dataValue}><td><b>Key</b>:</td><td>{rule.Key}</td></tr>
<tr className={styles.dataValue}><td><b>Key:</b></td> <td>{rule.Key}</td></tr>
}
{
rule.Latency &&
rule.Latency !== 0 &&
<tr className={styles.dataValue}><td><b>Latency:</b></td> <td>{rule.Latency}</td></tr>
}
{
@@ -231,7 +231,7 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({serv
}
{
rule.Service &&
<tr className={styles.dataValue}><td><b>Service:</b></td> <td>{service}</td></tr>
<tr className={styles.dataValue}><td><b>Service:</b></td> <td>{rule.Service}</td></tr>
}
{
rule.Type &&
@@ -251,7 +251,7 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({serv
</tbody>
</table>
</EntrySectionContainer>
</> : <span/>
</> : <span className={styles.noRules}>No rules could be applied to this request.</span>
}
</React.Fragment>
}

View File

@@ -33,8 +33,7 @@ const SectionsRepresentation: React.FC<any> = ({data, color}) => {
return <>{sections}</>;
}
const AutoRepresentation: React.FC<any> = ({representation, color}) => {
const rulesMatched = []
const AutoRepresentation: React.FC<any> = ({representation, rulesMatched, elapsedTime, color}) => {
const TABS = [
{
tab: 'request'
@@ -68,8 +67,7 @@ const AutoRepresentation: React.FC<any> = ({representation, color}) => {
<SectionsRepresentation data={response} color={color}/>
</React.Fragment>}
{currentTab === TABS[2].tab && <React.Fragment>
{// FIXME: Fix here
<EntryTablePolicySection service={representation.service} title={'Rule'} color={color} latency={0} response={response} arrayToIterate={rulesMatched ? rulesMatched : []}/>}
<EntryTablePolicySection service={representation.service} title={'Rule'} color={color} latency={elapsedTime} response={response} arrayToIterate={rulesMatched ? rulesMatched : []}/>
</React.Fragment>}
</div>}
</div>;
@@ -77,11 +75,13 @@ const AutoRepresentation: React.FC<any> = ({representation, color}) => {
interface Props {
representation: any;
color: string,
rulesMatched: any;
color: string;
elapsedTime: number;
}
const EntryViewer: React.FC<Props> = ({representation, color}) => {
return <AutoRepresentation representation={representation} color={color}/>
const EntryViewer: React.FC<Props> = ({representation, rulesMatched, elapsedTime, color}) => {
return <AutoRepresentation representation={representation} rulesMatched={rulesMatched} elapsedTime={elapsedTime} color={color}/>
};
export default EntryViewer;

View File

@@ -19,20 +19,31 @@
.rowSelected
border: 1px $blue-color solid
// border-left: 5px $blue-color solid
margin-left: 10px
margin-right: 3px
.ruleSuccessRow
border: 1px $success-color solid
// border-left: 5px $success-color solid
background: #E8FFF1
.ruleSuccessRowSelected
border: 1px #6FCF97 solid
border-left: 5px #6FCF97 solid
.ruleFailureRow
background: #FFE9EF
.ruleFailureRowSelected
border: 1px $failure-color solid
// border-left: 5px $failure-color solid
border-left: 5px $failure-color solid
.ruleNumberText
font-size: 12px;
font-style: italic;
.ruleNumberTextFailure
color: #DB2156
.ruleNumberTextSuccess
color: #219653
.service
text-overflow: ellipsis

View File

@@ -23,7 +23,7 @@ interface Entry {
sourcePort: string,
destinationIp: string,
destinationPort: string,
isOutgoing?: boolean;
isOutgoing?: boolean;
latency: number;
rules: Rules;
}
@@ -38,10 +38,12 @@ interface EntryProps {
entry: Entry;
setFocusedEntryId: (id: string) => void;
isSelected?: boolean;
style: object;
}
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected}) => {
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style}) => {
const classification = getClassification(entry.statusCode)
const numberOfRules = entry.rules.numberOfRules
let ingoingIcon;
let outgoingIcon;
switch(classification) {
@@ -61,49 +63,47 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
break;
}
}
// let additionalRulesProperties = "";
// let ruleSuccess: boolean;
let additionalRulesProperties = "";
let ruleSuccess: boolean;
let rule = 'latency' in entry.rules
if (rule) {
if (entry.rules.latency !== -1) {
if (entry.rules.latency >= entry.latency) {
// additionalRulesProperties = styles.ruleSuccessRow
// ruleSuccess = true
additionalRulesProperties = styles.ruleSuccessRow
ruleSuccess = true
} else {
// additionalRulesProperties = styles.ruleFailureRow
// ruleSuccess = false
additionalRulesProperties = styles.ruleFailureRow
ruleSuccess = false
}
if (isSelected) {
// additionalRulesProperties += ` ${entry.rules.latency >= entry.latency ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
additionalRulesProperties += ` ${entry.rules.latency >= entry.latency ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
}
} else {
if (entry.rules.status) {
// additionalRulesProperties = styles.ruleSuccessRow
// ruleSuccess = true
additionalRulesProperties = styles.ruleSuccessRow
ruleSuccess = true
} else {
// additionalRulesProperties = styles.ruleFailureRow
// ruleSuccess = false
additionalRulesProperties = styles.ruleFailureRow
ruleSuccess = false
}
if (isSelected) {
// additionalRulesProperties += ` ${entry.rules.status ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
additionalRulesProperties += ` ${entry.rules.status ? styles.ruleSuccessRowSelected : styles.ruleFailureRowSelected}`
}
}
}
let backgroundColor = "";
if ('latency' in entry.rules) {
if (entry.rules.latency !== -1) {
backgroundColor = entry.rules.latency >= entry.latency ? styles.ruleSuccessRow : styles.ruleFailureRow
} else {
backgroundColor = entry.rules.status ? styles.ruleSuccessRow : styles.ruleFailureRow
}
}
return <>
<div
id={entry.id}
className={`${styles.row}
${isSelected ? styles.rowSelected : backgroundColor}`}
${isSelected && !rule ? styles.rowSelected : additionalRulesProperties}`}
onClick={() => setFocusedEntryId(entry.id)}
style={{border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid"}}
style={{
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
position: "absolute",
top: style['top'],
marginTop: style['marginTop'],
width: "calc(100% - 25px)",
}}
>
<Protocol protocol={entry.protocol} horizontal={false}/>
{((entry.protocol.name === "http" && "statusCode" in entry) || entry.statusCode !== 0) && <div>
@@ -115,6 +115,13 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
<span title="Service Name">{entry.service}</span>
</div>
</div>
{
rule ?
<div className={`${styles.ruleNumberText} ${ruleSuccess ? styles.ruleNumberTextSuccess : styles.ruleNumberTextFailure}`}>
{`Rules (${numberOfRules})`}
</div>
: ""
}
<div className={styles.directionContainer}>
<span className={styles.port} title="Source Port">{entry.sourcePort}</span>
{entry.isOutgoing ?
@@ -131,4 +138,5 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
</div>
</div>
</>
};
}

View File

@@ -86,10 +86,6 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
}
if (!focusedEntryId) setFocusedEntryId(entry.id)
let newEntries = [...entries];
if (entries.length === 1000) {
newEntries = newEntries.splice(1);
setNoMoreDataTop(false);
}
setEntries([...newEntries, entry])
if(listEntry.current) {
if(isScrollable(listEntry.current.firstChild)) {

View File

@@ -14,7 +14,6 @@
flex-direction: column
overflow: hidden
flex-grow: 1
padding-top: 20px
.footer
display: flex