mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-09 14:12:16 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7cc077c8a0 | ||
|
|
fae5f22d25 | ||
|
|
eba7a3b476 | ||
|
|
cf231538f4 | ||
|
|
073b0b72d3 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -26,3 +26,6 @@ build
|
||||
|
||||
# Environment variables
|
||||
.env
|
||||
|
||||
# pprof
|
||||
pprof/*
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"strings"
|
||||
@@ -12,7 +11,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTapAndFetch(t *testing.T) {
|
||||
func TestTap(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("ignored acceptance test")
|
||||
}
|
||||
@@ -93,37 +92,6 @@ func TestTapAndFetch(t *testing.T) {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fetchCmdArgs := getDefaultFetchCommandArgs()
|
||||
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
|
||||
t.Logf("running command: %v", fetchCmd.String())
|
||||
|
||||
if err := fetchCmd.Start(); err != nil {
|
||||
t.Errorf("failed to start fetch command, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
harCheckFunc := func() error {
|
||||
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
|
||||
if readFileErr != nil {
|
||||
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
|
||||
}
|
||||
|
||||
harEntries, err := getEntriesFromHarBytes(harBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get entries from har, err: %v", err)
|
||||
}
|
||||
|
||||
if len(harEntries) == 0 {
|
||||
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err := retriesExecute(shortRetriesCount, harCheckFunc); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,13 +76,6 @@ func getDefaultTapNamespace() []string {
|
||||
return []string{"-n", "mizu-tests"}
|
||||
}
|
||||
|
||||
func getDefaultFetchCommandArgs() []string {
|
||||
fetchCommand := "fetch"
|
||||
defaultCmdArgs := getDefaultCommandArgs()
|
||||
|
||||
return append([]string{fetchCommand}, defaultCmdArgs...)
|
||||
}
|
||||
|
||||
func getDefaultConfigCommandArgs() []string {
|
||||
configCommand := "config"
|
||||
defaultCmdArgs := getDefaultCommandArgs()
|
||||
@@ -91,10 +84,10 @@ func getDefaultConfigCommandArgs() []string {
|
||||
}
|
||||
|
||||
func retriesExecute(retriesCount int, executeFunc func() error) error {
|
||||
var lastError error
|
||||
var lastError interface{}
|
||||
|
||||
for i := 0; i < retriesCount; i++ {
|
||||
if err := executeFunc(); err != nil {
|
||||
if err := tryExecuteFunc(executeFunc); err != nil {
|
||||
lastError = err
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -107,6 +100,16 @@ func retriesExecute(retriesCount int, executeFunc func() error) error {
|
||||
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
|
||||
}
|
||||
|
||||
func tryExecuteFunc(executeFunc func() error) (err interface{}) {
|
||||
defer func() {
|
||||
if panicErr := recover(); panicErr != nil {
|
||||
err = panicErr
|
||||
}
|
||||
}()
|
||||
|
||||
return executeFunc()
|
||||
}
|
||||
|
||||
func waitTapPodsReady(apiServerUrl string) error {
|
||||
resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl)
|
||||
tapPodsReadyFunc := func() error {
|
||||
@@ -179,19 +182,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error) {
|
||||
harInterface, convertErr := jsonBytesToInterface(harBytes)
|
||||
if convertErr != nil {
|
||||
return nil, convertErr
|
||||
}
|
||||
|
||||
har := harInterface.(map[string]interface{})
|
||||
harLog := har["log"].(map[string]interface{})
|
||||
harEntries := harLog["entries"].([]interface{})
|
||||
|
||||
return harEntries, nil
|
||||
}
|
||||
|
||||
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
|
||||
tapStatus := tapStatusInterface.(map[string]interface{})
|
||||
podsInterface := tapStatus["pods"].([]interface{})
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# mizu agent
|
||||
Agent for MIZU (API server and tapper)
|
||||
Basic APIs:
|
||||
* /fetch - retrieve traffic data
|
||||
* /stats - retrieve statistics of collected data
|
||||
* /viewer - web ui
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package controllers
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/providers"
|
||||
@@ -10,11 +11,9 @@ import (
|
||||
"mizuserver/pkg/utils"
|
||||
"mizuserver/pkg/validation"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
@@ -64,93 +63,6 @@ func GetEntries(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, baseEntries)
|
||||
}
|
||||
|
||||
func GetHARs(c *gin.Context) {
|
||||
entriesFilter := &models.HarFetchRequestQuery{}
|
||||
order := database.OrderDesc
|
||||
if err := c.BindQuery(entriesFilter); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
}
|
||||
err := validation.Validate(entriesFilter)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
}
|
||||
|
||||
var timestampFrom, timestampTo int64
|
||||
|
||||
if entriesFilter.From < 0 {
|
||||
timestampFrom = 0
|
||||
} else {
|
||||
timestampFrom = entriesFilter.From
|
||||
}
|
||||
if entriesFilter.To <= 0 {
|
||||
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
} else {
|
||||
timestampTo = entriesFilter.To
|
||||
}
|
||||
|
||||
var entries []tapApi.MizuEntry
|
||||
database.GetEntriesTable().
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 {
|
||||
// the entries always order from oldest to newest - we should reverse
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
harsObject := map[string]*models.ExtendedHAR{}
|
||||
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
if entryData.ResolvedDestination != "" {
|
||||
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
|
||||
}
|
||||
|
||||
var fileName string
|
||||
sourceOfEntry := entryData.ResolvedSource
|
||||
if sourceOfEntry != "" {
|
||||
// naively assumes the proper service source is http
|
||||
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
|
||||
//replace / from the file name because they end up creating a corrupted folder
|
||||
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
|
||||
} else {
|
||||
fileName = "unknown_source.har"
|
||||
}
|
||||
if harOfSource, ok := harsObject[fileName]; ok {
|
||||
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
|
||||
} else {
|
||||
var entriesHar []*har.Entry
|
||||
entriesHar = append(entriesHar, &harEntry)
|
||||
harsObject[fileName] = &models.ExtendedHAR{
|
||||
Log: &models.ExtendedLog{
|
||||
Version: "1.2",
|
||||
Creator: &models.ExtendedCreator{
|
||||
Creator: &har.Creator{
|
||||
Name: "mizu",
|
||||
Version: "0.0.2",
|
||||
},
|
||||
},
|
||||
Entries: entriesHar,
|
||||
},
|
||||
}
|
||||
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
|
||||
if sourceOfEntry != "" {
|
||||
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
retObj := map[string][]byte{}
|
||||
for k, v := range harsObject {
|
||||
bytesData, _ := json.Marshal(v)
|
||||
retObj[k] = bytesData
|
||||
}
|
||||
buffer := utils.ZipData(retObj)
|
||||
c.Data(http.StatusOK, "application/octet-stream", buffer.Bytes())
|
||||
}
|
||||
|
||||
func UploadEntries(c *gin.Context) {
|
||||
rlog.Infof("Upload entries - started\n")
|
||||
|
||||
|
||||
@@ -15,8 +15,6 @@ func EntriesRoutes(ginApp *gin.Engine) {
|
||||
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
|
||||
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
|
||||
|
||||
routeGroup.GET("/har", controllers.GetHARs)
|
||||
|
||||
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
|
||||
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -131,28 +130,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
|
||||
return generalStats, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
|
||||
if !provider.isReady {
|
||||
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting har from api server %w", err)
|
||||
}
|
||||
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed reading hars %w", err)
|
||||
}
|
||||
|
||||
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed craeting zip reader %w", err)
|
||||
}
|
||||
return zipReader, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetVersion() (string, error) {
|
||||
if !provider.isReady {
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu/version"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
)
|
||||
|
||||
var fetchCmd = &cobra.Command{
|
||||
Use: "fetch",
|
||||
Short: "Download recorded traffic to files",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("fetch", config.Config.Fetch)
|
||||
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
|
||||
return nil
|
||||
}
|
||||
|
||||
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
|
||||
return err
|
||||
} else if !isCompatible {
|
||||
return nil
|
||||
}
|
||||
RunMizuFetch()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(fetchCmd)
|
||||
|
||||
defaultFetchConfig := configStructs.FetchConfig{}
|
||||
defaults.Set(&defaultFetchConfig)
|
||||
|
||||
fetchCmd.Flags().StringP(configStructs.DirectoryFetchName, "d", defaultFetchConfig.Directory, "Provide a custom directory for fetched entries")
|
||||
fetchCmd.Flags().Int(configStructs.FromTimestampFetchName, defaultFetchConfig.FromTimestamp, "Custom start timestamp for fetched entries")
|
||||
fetchCmd.Flags().Int(configStructs.ToTimestampFetchName, defaultFetchConfig.ToTimestamp, "Custom end timestamp fetched entries")
|
||||
fetchCmd.Flags().Uint16P(configStructs.GuiPortFetchName, "p", defaultFetchConfig.GuiPort, "Provide a custom port for the web interface webserver")
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
)
|
||||
|
||||
func RunMizuFetch() {
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
}
|
||||
|
||||
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed fetch data from API server %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed unzip %v", err)
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,6 @@ func init() {
|
||||
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
||||
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
|
||||
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
|
||||
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
|
||||
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ const (
|
||||
|
||||
type ConfigStruct struct {
|
||||
Tap configStructs.TapConfig `yaml:"tap"`
|
||||
Fetch configStructs.FetchConfig `yaml:"fetch"`
|
||||
Version configStructs.VersionConfig `yaml:"version"`
|
||||
View configStructs.ViewConfig `yaml:"view"`
|
||||
Logs configStructs.LogsConfig `yaml:"logs"`
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package configStructs
|
||||
|
||||
const (
|
||||
DirectoryFetchName = "directory"
|
||||
FromTimestampFetchName = "from"
|
||||
ToTimestampFetchName = "to"
|
||||
GuiPortFetchName = "gui-port"
|
||||
)
|
||||
|
||||
type FetchConfig struct {
|
||||
Directory string `yaml:"directory" default:"."`
|
||||
FromTimestamp int `yaml:"from" default:"0"`
|
||||
ToTimestamp int `yaml:"to" default:"0"`
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
}
|
||||
@@ -3,10 +3,8 @@ package configStructs
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/units"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -17,7 +15,6 @@ const (
|
||||
PlainTextFilterRegexesTapName = "regex-masking"
|
||||
DisableRedactionTapName = "no-redact"
|
||||
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
|
||||
DirectionTapName = "direction"
|
||||
DryRunTapName = "dry-run"
|
||||
EnforcePolicyFile = "test-rules"
|
||||
)
|
||||
@@ -34,7 +31,6 @@ type TapConfig struct {
|
||||
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
|
||||
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
||||
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
|
||||
Direction string `yaml:"direction" default:"in"`
|
||||
DryRun bool `yaml:"dry-run" default:"false"`
|
||||
EnforcePolicyFile string `yaml:"test-rules"`
|
||||
ApiServerResources Resources `yaml:"api-server-resources"`
|
||||
@@ -69,10 +65,5 @@ func (config *TapConfig) Validate() error {
|
||||
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
|
||||
}
|
||||
|
||||
directionLowerCase := strings.ToLower(config.Direction)
|
||||
if directionLowerCase != "any" && directionLowerCase != "in" {
|
||||
return errors.New(fmt.Sprintf("%s is not a valid value for flag --%s. Acceptable values are in/any.", config.Direction, DirectionTapName))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
|
||||
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
|
||||
)
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-github/v37/github"
|
||||
@@ -76,7 +77,7 @@ func CheckNewerVersion(versionChan chan string) {
|
||||
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
|
||||
|
||||
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
|
||||
versionChan <- fmt.Sprintf("Update available! %v -> %v (%v)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL)
|
||||
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
|
||||
} else {
|
||||
versionChan <- ""
|
||||
}
|
||||
|
||||
@@ -8,4 +8,5 @@ const (
|
||||
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
|
||||
RulePolicyPath = "/app/enforce-policy/"
|
||||
RulePolicyFileName = "enforce-policy.yaml"
|
||||
GoGCEnvVar = "GOGC"
|
||||
)
|
||||
|
||||
@@ -132,6 +132,7 @@ type BaseEntryDetails struct {
|
||||
Url string `json:"url,omitempty"`
|
||||
RequestSenderIp string `json:"request_sender_ip,omitempty"`
|
||||
Service string `json:"service,omitempty"`
|
||||
Path string `json:"path,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
StatusCode int `json:"status_code"`
|
||||
Method string `json:"method,omitempty"`
|
||||
|
||||
@@ -82,9 +82,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
if err == io.EOF {
|
||||
// We must read until we see an EOF... very important!
|
||||
return errors.New("AMQP EOF")
|
||||
} else if err != nil {
|
||||
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
|
||||
return err
|
||||
}
|
||||
|
||||
switch f := frame.(type) {
|
||||
@@ -101,6 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.Properties = header.Properties
|
||||
default:
|
||||
frame = nil
|
||||
}
|
||||
|
||||
case *BodyFrame:
|
||||
@@ -115,6 +113,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
eventBasicDeliver.Body = f.Body
|
||||
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
default:
|
||||
body = nil
|
||||
frame = nil
|
||||
}
|
||||
|
||||
case *MethodFrame:
|
||||
@@ -200,6 +200,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
default:
|
||||
frame = nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
|
||||
channel := binary.BigEndian.Uint16(scratch[1:3])
|
||||
size := binary.BigEndian.Uint32(scratch[3:7])
|
||||
|
||||
if size > 1000000 {
|
||||
if size > 1000000*16 {
|
||||
return nil, ErrMaxSize
|
||||
}
|
||||
|
||||
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
|
||||
return
|
||||
}
|
||||
|
||||
if hf.Size > 512 {
|
||||
return nil, ErrMaxHeaderFrameSize
|
||||
}
|
||||
|
||||
var flags uint16
|
||||
|
||||
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
|
||||
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
|
||||
}
|
||||
|
||||
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
|
||||
bf.Body = nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
@@ -19,32 +18,35 @@ import (
|
||||
// ErrCredentials. The text of the error is likely more interesting than
|
||||
// these constants.
|
||||
const (
|
||||
frameMethod = 1
|
||||
frameHeader = 2
|
||||
frameBody = 3
|
||||
frameHeartbeat = 8
|
||||
frameMinSize = 4096
|
||||
frameEnd = 206
|
||||
replySuccess = 200
|
||||
ContentTooLarge = 311
|
||||
NoRoute = 312
|
||||
NoConsumers = 313
|
||||
ConnectionForced = 320
|
||||
InvalidPath = 402
|
||||
AccessRefused = 403
|
||||
NotFound = 404
|
||||
ResourceLocked = 405
|
||||
PreconditionFailed = 406
|
||||
FrameError = 501
|
||||
SyntaxError = 502
|
||||
CommandInvalid = 503
|
||||
ChannelError = 504
|
||||
UnexpectedFrame = 505
|
||||
ResourceError = 506
|
||||
NotAllowed = 530
|
||||
NotImplemented = 540
|
||||
InternalError = 541
|
||||
MaxSizeError = 551
|
||||
frameMethod = 1
|
||||
frameHeader = 2
|
||||
frameBody = 3
|
||||
frameHeartbeat = 8
|
||||
frameMinSize = 4096
|
||||
frameEnd = 206
|
||||
replySuccess = 200
|
||||
ContentTooLarge = 311
|
||||
NoRoute = 312
|
||||
NoConsumers = 313
|
||||
ConnectionForced = 320
|
||||
InvalidPath = 402
|
||||
AccessRefused = 403
|
||||
NotFound = 404
|
||||
ResourceLocked = 405
|
||||
PreconditionFailed = 406
|
||||
FrameError = 501
|
||||
SyntaxError = 502
|
||||
CommandInvalid = 503
|
||||
ChannelError = 504
|
||||
UnexpectedFrame = 505
|
||||
ResourceError = 506
|
||||
NotAllowed = 530
|
||||
NotImplemented = 540
|
||||
InternalError = 541
|
||||
MaxSizeError = 551
|
||||
MaxHeaderFrameSizeError = 552
|
||||
BadMethodFrameUnknownMethod = 601
|
||||
BadMethodFrameUnknownClass = 602
|
||||
)
|
||||
|
||||
func isSoftExceptionCode(code int) bool {
|
||||
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 20: // channel
|
||||
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 40: // exchange
|
||||
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 50: // queue
|
||||
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 60: // basic
|
||||
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 90: // tx
|
||||
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 85: // confirm
|
||||
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownClass
|
||||
}
|
||||
|
||||
return mf, nil
|
||||
|
||||
@@ -60,8 +60,13 @@ var (
|
||||
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
|
||||
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
|
||||
|
||||
// ErrClosed is returned when the channel or connection is not open
|
||||
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
|
||||
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
|
||||
|
||||
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
|
||||
|
||||
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
|
||||
|
||||
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
|
||||
)
|
||||
|
||||
// Error captures the code and reason a channel or connection has been closed
|
||||
|
||||
@@ -201,6 +201,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
Url: entry.Url,
|
||||
RequestSenderIp: entry.RequestSenderIp,
|
||||
Service: entry.Service,
|
||||
Path: entry.Path,
|
||||
Summary: entry.Path,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -168,11 +169,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
||||
}
|
||||
|
||||
func startMemoryProfiler() {
|
||||
dirname := "/app/pprof"
|
||||
rlog.Info("Profiling is on, results will be written to %s", dirname)
|
||||
dumpPath := "/app/pprof"
|
||||
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
|
||||
if envDumpPath != "" {
|
||||
dumpPath = envDumpPath
|
||||
}
|
||||
timeInterval := 60
|
||||
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
|
||||
if envTimeInterval != "" {
|
||||
if i, err := strconv.Atoi(envTimeInterval); err == nil {
|
||||
timeInterval = i
|
||||
}
|
||||
}
|
||||
|
||||
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
go func() {
|
||||
if _, err := os.Stat(dirname); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dirname, 0777); err != nil {
|
||||
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dumpPath, 0777); err != nil {
|
||||
log.Fatal("could not create directory for profile: ", err)
|
||||
}
|
||||
}
|
||||
@@ -180,9 +193,9 @@ func startMemoryProfiler() {
|
||||
for true {
|
||||
t := time.Now()
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
rlog.Info("Writing memory profile to %s\n", filename)
|
||||
rlog.Infof("Writing memory profile to %s\n", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
@@ -193,7 +206,7 @@ func startMemoryProfiler() {
|
||||
log.Fatal("could not write memory profile: ", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(time.Minute)
|
||||
time.Sleep(time.Second * time.Duration(timeInterval))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
|
||||
const (
|
||||
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
|
||||
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
|
||||
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
|
||||
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
|
||||
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
|
||||
MaxBufferedPagesTotalDefaultValue = 5000
|
||||
|
||||
@@ -54,8 +54,8 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
|
||||
const filterEntries = useCallback((entry) => {
|
||||
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
|
||||
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
|
||||
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
|
||||
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
|
||||
if(statusFilter.includes(StatusType.SUCCESS) && entry.status_code >= 400) return;
|
||||
if(statusFilter.includes(StatusType.ERROR) && entry.status_code < 400) return;
|
||||
return entry;
|
||||
},[methodsFilter, pathFilter, statusFilter])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user