From 7cc077c8a0769136bf11721a748db8d1b7a87c1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Thu, 9 Sep 2021 17:45:37 +0300 Subject: [PATCH] Fix the memory exhaustion by optimizing max. AMQP message size and `GOGC` (#257) * Permanently resolve the memory exhaustion in AMQP Introduce; - `MEMORY_PROFILING_DUMP_PATH` - `MEMORY_PROFILING_TIME_INTERVAL` environment variables and make `startMemoryProfiler` method more parameterized. * Fix a leak in HTTP * Revert "Fix a leak in HTTP" This reverts commit 9d46820ff3730bc5ef80f90243a42b12e8f695fb. * Set maximum AMQP message size to 16MB * Set `GOGC` to 12800 * Remove some commented out lines and an unnecessary `else if` --- .gitignore | 3 ++ cli/kubernetes/provider.go | 1 + shared/consts.go | 1 + tap/extensions/amqp/main.go | 7 ++-- tap/extensions/amqp/read.go | 7 +++- tap/extensions/amqp/spec091.go | 72 +++++++++++++++++----------------- tap/extensions/amqp/types.go | 9 ++++- tap/passive_tapper.go | 27 +++++++++---- tap/settings.go | 2 + 9 files changed, 81 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index 59674fecc..4d5c55b7e 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ build # Environment variables .env + +# pprof +pprof/* diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 7c76290b9..64dfb8cb9 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac agentContainer.WithEnv( applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)), + applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"), ) agentContainer.WithEnv( applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom( diff --git a/shared/consts.go b/shared/consts.go index 71cafdff2..4241c6314 100644 --- a/shared/consts.go +++ b/shared/consts.go @@ -8,4 +8,5 @@ const ( MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES" RulePolicyPath = "/app/enforce-policy/" RulePolicyFileName = "enforce-policy.yaml" + GoGCEnvVar = "GOGC" ) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index af372b77d..91b770ee9 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -82,9 +82,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co if err == io.EOF { // We must read until we see an EOF... very important! return errors.New("AMQP EOF") - } else if err != nil { - // TODO: Causes ignoring some methods. Return only in case of a certain error. But what? - return err } switch f := frame.(type) { @@ -101,6 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co case *BasicDeliver: eventBasicDeliver.Properties = header.Properties default: + frame = nil } case *BodyFrame: @@ -115,6 +113,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co eventBasicDeliver.Body = f.Body emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter) default: + body = nil + frame = nil } case *MethodFrame: @@ -200,6 +200,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter) default: + frame = nil } diff --git a/tap/extensions/amqp/read.go b/tap/extensions/amqp/read.go index e31bb7198..961ce0897 100644 --- a/tap/extensions/amqp/read.go +++ b/tap/extensions/amqp/read.go @@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) { channel := binary.BigEndian.Uint16(scratch[1:3]) size := binary.BigEndian.Uint32(scratch[3:7]) - if size > 1000000 { + if size > 1000000*16 { return nil, ErrMaxSize } @@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame, return } + if hf.Size > 512 { + return nil, ErrMaxHeaderFrameSize + } + var flags uint16 if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil { @@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e } if _, err = io.ReadFull(r.R, bf.Body); err != nil { + bf.Body = nil return nil, err } diff --git a/tap/extensions/amqp/spec091.go b/tap/extensions/amqp/spec091.go index ea8448da4..3dd3dcbd3 100644 --- a/tap/extensions/amqp/spec091.go +++ b/tap/extensions/amqp/spec091.go @@ -10,7 +10,6 @@ package main import ( "encoding/binary" - "fmt" "io" ) @@ -19,32 +18,35 @@ import ( // ErrCredentials. The text of the error is likely more interesting than // these constants. const ( - frameMethod = 1 - frameHeader = 2 - frameBody = 3 - frameHeartbeat = 8 - frameMinSize = 4096 - frameEnd = 206 - replySuccess = 200 - ContentTooLarge = 311 - NoRoute = 312 - NoConsumers = 313 - ConnectionForced = 320 - InvalidPath = 402 - AccessRefused = 403 - NotFound = 404 - ResourceLocked = 405 - PreconditionFailed = 406 - FrameError = 501 - SyntaxError = 502 - CommandInvalid = 503 - ChannelError = 504 - UnexpectedFrame = 505 - ResourceError = 506 - NotAllowed = 530 - NotImplemented = 540 - InternalError = 541 - MaxSizeError = 551 + frameMethod = 1 + frameHeader = 2 + frameBody = 3 + frameHeartbeat = 8 + frameMinSize = 4096 + frameEnd = 206 + replySuccess = 200 + ContentTooLarge = 311 + NoRoute = 312 + NoConsumers = 313 + ConnectionForced = 320 + InvalidPath = 402 + AccessRefused = 403 + NotFound = 404 + ResourceLocked = 405 + PreconditionFailed = 406 + FrameError = 501 + SyntaxError = 502 + CommandInvalid = 503 + ChannelError = 504 + UnexpectedFrame = 505 + ResourceError = 506 + NotAllowed = 530 + NotImplemented = 540 + InternalError = 541 + MaxSizeError = 551 + MaxHeaderFrameSizeError = 552 + BadMethodFrameUnknownMethod = 601 + BadMethodFrameUnknownClass = 602 ) func isSoftExceptionCode(code int) bool { @@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } case 20: // channel @@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } case 40: // exchange @@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } case 50: // queue @@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } case 60: // basic @@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } case 90: // tx @@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } case 85: // confirm @@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err mf.Method = method default: - return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) + return nil, ErrBadMethodFrameUnknownMethod } default: - return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId) + return nil, ErrBadMethodFrameUnknownClass } return mf, nil diff --git a/tap/extensions/amqp/types.go b/tap/extensions/amqp/types.go index 78a1cf3b8..ea57556fa 100644 --- a/tap/extensions/amqp/types.go +++ b/tap/extensions/amqp/types.go @@ -60,8 +60,13 @@ var ( // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP. ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} - // ErrClosed is returned when the channel or connection is not open - ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"} + ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"} + + ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"} + + ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"} + + ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"} ) // Error captures the code and reason a channel or connection has been closed diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 58695ed70..c4f2b0a22 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -18,6 +18,7 @@ import ( "os/signal" "runtime" "runtime/pprof" + "strconv" "strings" "sync" "time" @@ -168,11 +169,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, } func startMemoryProfiler() { - dirname := "/app/pprof" - rlog.Info("Profiling is on, results will be written to %s", dirname) + dumpPath := "/app/pprof" + envDumpPath := os.Getenv(MemoryProfilingDumpPath) + if envDumpPath != "" { + dumpPath = envDumpPath + } + timeInterval := 60 + envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds) + if envTimeInterval != "" { + if i, err := strconv.Atoi(envTimeInterval); err == nil { + timeInterval = i + } + } + + rlog.Info("Profiling is on, results will be written to %s", dumpPath) go func() { - if _, err := os.Stat(dirname); os.IsNotExist(err) { - if err := os.Mkdir(dirname, 0777); err != nil { + if _, err := os.Stat(dumpPath); os.IsNotExist(err) { + if err := os.Mkdir(dumpPath, 0777); err != nil { log.Fatal("could not create directory for profile: ", err) } } @@ -180,9 +193,9 @@ func startMemoryProfiler() { for true { t := time.Now() - filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05")) + filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05")) - rlog.Info("Writing memory profile to %s\n", filename) + rlog.Infof("Writing memory profile to %s\n", filename) f, err := os.Create(filename) if err != nil { @@ -193,7 +206,7 @@ func startMemoryProfiler() { log.Fatal("could not write memory profile: ", err) } _ = f.Close() - time.Sleep(time.Minute) + time.Sleep(time.Second * time.Duration(timeInterval)) } }() } diff --git a/tap/settings.go b/tap/settings.go index 96f12b2db..439e82e3f 100644 --- a/tap/settings.go +++ b/tap/settings.go @@ -7,6 +7,8 @@ import ( const ( MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED" + MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH" + MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL" MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL" MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION" MaxBufferedPagesTotalDefaultValue = 5000