Compare commits

..

5 Commits

Author SHA1 Message Date
M. Mert Yıldıran
7cc077c8a0 Fix the memory exhaustion by optimizing max. AMQP message size and GOGC (#257)
* Permanently resolve the memory exhaustion in AMQP

Introduce;
- `MEMORY_PROFILING_DUMP_PATH`
- `MEMORY_PROFILING_TIME_INTERVAL`
environment variables and make `startMemoryProfiler` method more parameterized.

* Fix a leak in HTTP

* Revert "Fix a leak in HTTP"

This reverts commit 9d46820ff3.

* Set maximum AMQP message size to 16MB

* Set `GOGC` to 12800

* Remove some commented out lines and an unnecessary `else if`
2021-09-09 17:45:37 +03:00
Igor Gov
fae5f22d25 Adding a download command if new mizu version is detected (#267) 2021-09-08 22:28:07 +03:00
M. Mert Yıldıran
eba7a3b476 Temporarily fix the filtering into its original state (#266) 2021-09-07 09:50:33 +03:00
RoyUP9
cf231538f4 added panic catch on tests (#265) 2021-09-06 11:42:47 +03:00
gadotroee
073b0b72d3 Remove fetch command (and direction) (#264) 2021-09-06 10:16:04 +03:00
25 changed files with 102 additions and 319 deletions

3
.gitignore vendored
View File

@@ -26,3 +26,6 @@ build
# Environment variables
.env
# pprof
pprof/*

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"strings"
@@ -12,7 +11,7 @@ import (
"time"
)
func TestTapAndFetch(t *testing.T) {
func TestTap(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
@@ -93,37 +92,6 @@ func TestTapAndFetch(t *testing.T) {
t.Errorf("%v", err)
return
}
fetchCmdArgs := getDefaultFetchCommandArgs()
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
t.Logf("running command: %v", fetchCmd.String())
if err := fetchCmd.Start(); err != nil {
t.Errorf("failed to start fetch command, err: %v", err)
return
}
harCheckFunc := func() error {
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
if readFileErr != nil {
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
}
harEntries, err := getEntriesFromHarBytes(harBytes)
if err != nil {
return fmt.Errorf("failed to get entries from har, err: %v", err)
}
if len(harEntries) == 0 {
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
}
return nil
}
if err := retriesExecute(shortRetriesCount, harCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
})
}
}

View File

@@ -76,13 +76,6 @@ func getDefaultTapNamespace() []string {
return []string{"-n", "mizu-tests"}
}
func getDefaultFetchCommandArgs() []string {
fetchCommand := "fetch"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{fetchCommand}, defaultCmdArgs...)
}
func getDefaultConfigCommandArgs() []string {
configCommand := "config"
defaultCmdArgs := getDefaultCommandArgs()
@@ -91,10 +84,10 @@ func getDefaultConfigCommandArgs() []string {
}
func retriesExecute(retriesCount int, executeFunc func() error) error {
var lastError error
var lastError interface{}
for i := 0; i < retriesCount; i++ {
if err := executeFunc(); err != nil {
if err := tryExecuteFunc(executeFunc); err != nil {
lastError = err
time.Sleep(1 * time.Second)
@@ -107,6 +100,16 @@ func retriesExecute(retriesCount int, executeFunc func() error) error {
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
}
func tryExecuteFunc(executeFunc func() error) (err interface{}) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = panicErr
}
}()
return executeFunc()
}
func waitTapPodsReady(apiServerUrl string) error {
resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl)
tapPodsReadyFunc := func() error {
@@ -179,19 +182,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
return nil
}
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error) {
harInterface, convertErr := jsonBytesToInterface(harBytes)
if convertErr != nil {
return nil, convertErr
}
har := harInterface.(map[string]interface{})
harLog := har["log"].(map[string]interface{})
harEntries := harLog["entries"].([]interface{})
return harEntries, nil
}
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapStatus := tapStatusInterface.(map[string]interface{})
podsInterface := tapStatus["pods"].([]interface{})

View File

@@ -1,7 +1,6 @@
# mizu agent
Agent for MIZU (API server and tapper)
Basic APIs:
* /fetch - retrieve traffic data
* /stats - retrieve statistics of collected data
* /viewer - web ui

View File

@@ -3,6 +3,7 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/google/martian/har"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
@@ -10,11 +11,9 @@ import (
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/martian/har"
"github.com/romana/rlog"
tapApi "github.com/up9inc/mizu/tap/api"
@@ -64,93 +63,6 @@ func GetEntries(c *gin.Context) {
c.JSON(http.StatusOK, baseEntries)
}
func GetHARs(c *gin.Context) {
entriesFilter := &models.HarFetchRequestQuery{}
order := database.OrderDesc
if err := c.BindQuery(entriesFilter); err != nil {
c.JSON(http.StatusBadRequest, err)
}
err := validation.Validate(entriesFilter)
if err != nil {
c.JSON(http.StatusBadRequest, err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []tapApi.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)
if len(entries) > 0 {
// the entries always order from oldest to newest - we should reverse
utils.ReverseSlice(entries)
}
harsObject := map[string]*models.ExtendedHAR{}
for _, entryData := range entries {
var harEntry har.Entry
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
if entryData.ResolvedDestination != "" {
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
}
var fileName string
sourceOfEntry := entryData.ResolvedSource
if sourceOfEntry != "" {
// naively assumes the proper service source is http
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
//replace / from the file name because they end up creating a corrupted folder
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
} else {
fileName = "unknown_source.har"
}
if harOfSource, ok := harsObject[fileName]; ok {
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
} else {
var entriesHar []*har.Entry
entriesHar = append(entriesHar, &harEntry)
harsObject[fileName] = &models.ExtendedHAR{
Log: &models.ExtendedLog{
Version: "1.2",
Creator: &models.ExtendedCreator{
Creator: &har.Creator{
Name: "mizu",
Version: "0.0.2",
},
},
Entries: entriesHar,
},
}
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
if sourceOfEntry != "" {
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
}
}
}
retObj := map[string][]byte{}
for k, v := range harsObject {
bytesData, _ := json.Marshal(v)
retObj[k] = bytesData
}
buffer := utils.ZipData(retObj)
c.Data(http.StatusOK, "application/octet-stream", buffer.Bytes())
}
func UploadEntries(c *gin.Context) {
rlog.Infof("Upload entries - started\n")

View File

@@ -15,8 +15,6 @@ func EntriesRoutes(ginApp *gin.Engine) {
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
routeGroup.GET("/har", controllers.GetHARs)
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB

View File

@@ -1,7 +1,6 @@
package apiserver
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
@@ -131,28 +130,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
return generalStats, nil
}
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
if err != nil {
return nil, fmt.Errorf("failed getting har from api server %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed reading hars %w", err)
}
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
return nil, fmt.Errorf("failed craeting zip reader %w", err)
}
return zipReader, nil
}
func (provider *apiServerProvider) GetVersion() (string, error) {
if !provider.isReady {

View File

@@ -1,46 +0,0 @@
package cmd
import (
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
)
var fetchCmd = &cobra.Command{
Use: "fetch",
Short: "Download recorded traffic to files",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("fetch", config.Config.Fetch)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
return nil
}
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
return err
} else if !isCompatible {
return nil
}
RunMizuFetch()
return nil
},
}
func init() {
rootCmd.AddCommand(fetchCmd)
defaultFetchConfig := configStructs.FetchConfig{}
defaults.Set(&defaultFetchConfig)
fetchCmd.Flags().StringP(configStructs.DirectoryFetchName, "d", defaultFetchConfig.Directory, "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int(configStructs.FromTimestampFetchName, defaultFetchConfig.FromTimestamp, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int(configStructs.ToTimestampFetchName, defaultFetchConfig.ToTimestamp, "Custom end timestamp fetched entries")
fetchCmd.Flags().Uint16P(configStructs.GuiPortFetchName, "p", defaultFetchConfig.GuiPort, "Provide a custom port for the web interface webserver")
}

View File

@@ -1,25 +0,0 @@
package cmd
import (
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
)
func RunMizuFetch() {
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
}
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
if err != nil {
logger.Log.Errorf("Failed fetch data from API server %v", err)
return
}
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
logger.Log.Debugf("[ERROR] failed unzip %v", err)
}
}

View File

@@ -64,7 +64,6 @@ func init() {
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
}

View File

@@ -18,7 +18,6 @@ const (
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Fetch configStructs.FetchConfig `yaml:"fetch"`
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`

View File

@@ -1,15 +0,0 @@
package configStructs
const (
DirectoryFetchName = "directory"
FromTimestampFetchName = "from"
ToTimestampFetchName = "to"
GuiPortFetchName = "gui-port"
)
type FetchConfig struct {
Directory string `yaml:"directory" default:"."`
FromTimestamp int `yaml:"from" default:"0"`
ToTimestamp int `yaml:"to" default:"0"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
}

View File

@@ -3,10 +3,8 @@ package configStructs
import (
"errors"
"fmt"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/units"
"regexp"
)
const (
@@ -17,7 +15,6 @@ const (
PlainTextFilterRegexesTapName = "regex-masking"
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DirectionTapName = "direction"
DryRunTapName = "dry-run"
EnforcePolicyFile = "test-rules"
)
@@ -34,7 +31,6 @@ type TapConfig struct {
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
Direction string `yaml:"direction" default:"in"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"test-rules"`
ApiServerResources Resources `yaml:"api-server-resources"`
@@ -69,10 +65,5 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
}
directionLowerCase := strings.ToLower(config.Direction)
if directionLowerCase != "any" && directionLowerCase != "in" {
return errors.New(fmt.Sprintf("%s is not a valid value for flag --%s. Acceptable values are in/any.", config.Direction, DirectionTapName))
}
return nil
}

View File

@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(

View File

@@ -8,6 +8,7 @@ import (
"github.com/up9inc/mizu/cli/mizu"
"io/ioutil"
"net/http"
"runtime"
"time"
"github.com/google/go-github/v37/github"
@@ -76,7 +77,7 @@ func CheckNewerVersion(versionChan chan string) {
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
versionChan <- fmt.Sprintf("Update available! %v -> %v (%v)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL)
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
} else {
versionChan <- ""
}

View File

@@ -8,4 +8,5 @@ const (
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml"
GoGCEnvVar = "GOGC"
)

View File

@@ -132,6 +132,7 @@ type BaseEntryDetails struct {
Url string `json:"url,omitempty"`
RequestSenderIp string `json:"request_sender_ip,omitempty"`
Service string `json:"service,omitempty"`
Path string `json:"path,omitempty"`
Summary string `json:"summary,omitempty"`
StatusCode int `json:"status_code"`
Method string `json:"method,omitempty"`

View File

@@ -82,9 +82,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
if err == io.EOF {
// We must read until we see an EOF... very important!
return errors.New("AMQP EOF")
} else if err != nil {
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
return err
}
switch f := frame.(type) {
@@ -101,6 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
frame = nil
}
case *BodyFrame:
@@ -115,6 +113,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
body = nil
frame = nil
}
case *MethodFrame:
@@ -200,6 +200,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
frame = nil
}

View File

@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
channel := binary.BigEndian.Uint16(scratch[1:3])
size := binary.BigEndian.Uint32(scratch[3:7])
if size > 1000000 {
if size > 1000000*16 {
return nil, ErrMaxSize
}
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
return
}
if hf.Size > 512 {
return nil, ErrMaxHeaderFrameSize
}
var flags uint16
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
}
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
bf.Body = nil
return nil, err
}

View File

@@ -10,7 +10,6 @@ package main
import (
"encoding/binary"
"fmt"
"io"
)
@@ -19,32 +18,35 @@ import (
// ErrCredentials. The text of the error is likely more interesting than
// these constants.
const (
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
MaxHeaderFrameSizeError = 552
BadMethodFrameUnknownMethod = 601
BadMethodFrameUnknownClass = 602
)
func isSoftExceptionCode(code int) bool {
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 20: // channel
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 40: // exchange
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 50: // queue
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 60: // basic
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 90: // tx
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 85: // confirm
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
default:
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
return nil, ErrBadMethodFrameUnknownClass
}
return mf, nil

View File

@@ -60,8 +60,13 @@ var (
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
// ErrClosed is returned when the channel or connection is not open
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
)
// Error captures the code and reason a channel or connection has been closed

View File

@@ -201,6 +201,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Path: entry.Path,
Summary: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,

View File

@@ -18,6 +18,7 @@ import (
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
@@ -168,11 +169,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
}
func startMemoryProfiler() {
dirname := "/app/pprof"
rlog.Info("Profiling is on, results will be written to %s", dirname)
dumpPath := "/app/pprof"
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dirname); os.IsNotExist(err) {
if err := os.Mkdir(dirname, 0777); err != nil {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err)
}
}
@@ -180,9 +193,9 @@ func startMemoryProfiler() {
for true {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
rlog.Info("Writing memory profile to %s\n", filename)
rlog.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
@@ -193,7 +206,7 @@ func startMemoryProfiler() {
log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Minute)
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}

View File

@@ -7,6 +7,8 @@ import (
const (
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
MaxBufferedPagesTotalDefaultValue = 5000

View File

@@ -54,8 +54,8 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
const filterEntries = useCallback((entry) => {
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.status_code >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.status_code < 400) return;
return entry;
},[methodsFilter, pathFilter, statusFilter])