Fix the memory exhaustion by optimizing max. AMQP message size and GOGC (#257)

* Permanently resolve the memory exhaustion in AMQP

Introduce;
- `MEMORY_PROFILING_DUMP_PATH`
- `MEMORY_PROFILING_TIME_INTERVAL`
environment variables and make `startMemoryProfiler` method more parameterized.

* Fix a leak in HTTP

* Revert "Fix a leak in HTTP"

This reverts commit 9d46820ff3.

* Set maximum AMQP message size to 16MB

* Set `GOGC` to 12800

* Remove some commented out lines and an unnecessary `else if`
This commit is contained in:
M. Mert Yıldıran 2021-09-09 17:45:37 +03:00 committed by GitHub
parent fae5f22d25
commit 7cc077c8a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 81 additions and 48 deletions

3
.gitignore vendored
View File

@ -26,3 +26,6 @@ build
# Environment variables # Environment variables
.env .env
# pprof
pprof/*

View File

@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv( agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)), applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
) )
agentContainer.WithEnv( agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom( applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(

View File

@ -8,4 +8,5 @@ const (
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES" MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
RulePolicyPath = "/app/enforce-policy/" RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml" RulePolicyFileName = "enforce-policy.yaml"
GoGCEnvVar = "GOGC"
) )

View File

@ -82,9 +82,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
if err == io.EOF { if err == io.EOF {
// We must read until we see an EOF... very important! // We must read until we see an EOF... very important!
return errors.New("AMQP EOF") return errors.New("AMQP EOF")
} else if err != nil {
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
return err
} }
switch f := frame.(type) { switch f := frame.(type) {
@ -101,6 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties eventBasicDeliver.Properties = header.Properties
default: default:
frame = nil
} }
case *BodyFrame: case *BodyFrame:
@ -115,6 +113,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default: default:
body = nil
frame = nil
} }
case *MethodFrame: case *MethodFrame:
@ -200,6 +200,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default: default:
frame = nil
} }

View File

@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
channel := binary.BigEndian.Uint16(scratch[1:3]) channel := binary.BigEndian.Uint16(scratch[1:3])
size := binary.BigEndian.Uint32(scratch[3:7]) size := binary.BigEndian.Uint32(scratch[3:7])
if size > 1000000 { if size > 1000000*16 {
return nil, ErrMaxSize return nil, ErrMaxSize
} }
@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
return return
} }
if hf.Size > 512 {
return nil, ErrMaxHeaderFrameSize
}
var flags uint16 var flags uint16
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil { if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
} }
if _, err = io.ReadFull(r.R, bf.Body); err != nil { if _, err = io.ReadFull(r.R, bf.Body); err != nil {
bf.Body = nil
return nil, err return nil, err
} }

View File

@ -10,7 +10,6 @@ package main
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"io" "io"
) )
@ -45,6 +44,9 @@ const (
NotImplemented = 540 NotImplemented = 540
InternalError = 541 InternalError = 541
MaxSizeError = 551 MaxSizeError = 551
MaxHeaderFrameSizeError = 552
BadMethodFrameUnknownMethod = 601
BadMethodFrameUnknownClass = 602
) )
func isSoftExceptionCode(code int) bool { func isSoftExceptionCode(code int) bool {
@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
case 20: // channel case 20: // channel
@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
case 40: // exchange case 40: // exchange
@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
case 50: // queue case 50: // queue
@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
case 60: // basic case 60: // basic
@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
case 90: // tx case 90: // tx
@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
case 85: // confirm case 85: // confirm
@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method mf.Method = method
default: default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) return nil, ErrBadMethodFrameUnknownMethod
} }
default: default:
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId) return nil, ErrBadMethodFrameUnknownClass
} }
return mf, nil return mf, nil

View File

@ -60,8 +60,13 @@ var (
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP. // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
// ErrClosed is returned when the channel or connection is not open ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
) )
// Error captures the code and reason a channel or connection has been closed // Error captures the code and reason a channel or connection has been closed

View File

@ -18,6 +18,7 @@ import (
"os/signal" "os/signal"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -168,11 +169,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
} }
func startMemoryProfiler() { func startMemoryProfiler() {
dirname := "/app/pprof" dumpPath := "/app/pprof"
rlog.Info("Profiling is on, results will be written to %s", dirname) envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
go func() { go func() {
if _, err := os.Stat(dirname); os.IsNotExist(err) { if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dirname, 0777); err != nil { if err := os.Mkdir(dumpPath, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err) log.Fatal("could not create directory for profile: ", err)
} }
} }
@ -180,9 +193,9 @@ func startMemoryProfiler() {
for true { for true {
t := time.Now() t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05")) filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
rlog.Info("Writing memory profile to %s\n", filename) rlog.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename) f, err := os.Create(filename)
if err != nil { if err != nil {
@ -193,7 +206,7 @@ func startMemoryProfiler() {
log.Fatal("could not write memory profile: ", err) log.Fatal("could not write memory profile: ", err)
} }
_ = f.Close() _ = f.Close()
time.Sleep(time.Minute) time.Sleep(time.Second * time.Duration(timeInterval))
} }
}() }()
} }

View File

@ -7,6 +7,8 @@ import (
const ( const (
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED" MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL" MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION" MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
MaxBufferedPagesTotalDefaultValue = 5000 MaxBufferedPagesTotalDefaultValue = 5000