Add option to memory profiling and limit page cache (#158)

This commit is contained in:
gadotroee 2021-08-03 17:30:13 +03:00 committed by GitHub
parent 69a9deab4b
commit 1e726e381b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 18 deletions

View File

@ -1,6 +1,7 @@
package tap package tap
import ( import (
"github.com/romana/rlog"
"sync" "sync"
"time" "time"
@ -20,19 +21,21 @@ type Cleaner struct {
cleanPeriod time.Duration cleanPeriod time.Duration
connectionTimeout time.Duration connectionTimeout time.Duration
stats CleanerStats stats CleanerStats
statsMutex sync.Mutex statsMutex sync.Mutex
} }
func (cl *Cleaner) clean() { func (cl *Cleaner) clean() {
startCleanTime := time.Now() startCleanTime := time.Now()
cl.assemblerMutex.Lock() cl.assemblerMutex.Lock()
rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock() cl.assemblerMutex.Unlock()
deleted := cl.matcher.deleteOlderThan(startCleanTime.Add(-cl.connectionTimeout)) deleted := cl.matcher.deleteOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.statsMutex.Lock() cl.statsMutex.Lock()
rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
cl.stats.flushed += flushed cl.stats.flushed += flushed
cl.stats.closed += closed cl.stats.closed += closed
cl.stats.deleted += deleted cl.stats.deleted += deleted
@ -55,7 +58,7 @@ func (cl *Cleaner) dumpStats() CleanerStats {
stats := CleanerStats{ stats := CleanerStats{
flushed: cl.stats.flushed, flushed: cl.stats.flushed,
closed : cl.stats.closed, closed: cl.stats.closed,
deleted: cl.stats.deleted, deleted: cl.stats.deleted,
} }

View File

@ -35,7 +35,8 @@ const AppPortsEnvVar = "APP_PORTS"
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT" const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB
const cleanPeriod = time.Second * 10 const cleanPeriod = time.Second * 10
var remoteOnlyOutboundPorts = []int { 80, 443 }
var remoteOnlyOutboundPorts = []int{80, 443}
func parseAppPorts(appPortsList string) []int { func parseAppPorts(appPortsList string) []int {
ports := make([]int, 0) ports := make([]int, 0)
@ -55,10 +56,10 @@ var decoder = flag.String("decoder", "", "Name of the decoder to use (default: g
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds") var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding") var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag") var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag")
var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global
var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global
var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global
var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global
var verbose = flag.Bool("verbose", false, "Be verbose") var verbose = flag.Bool("verbose", false, "Be verbose")
var debug = flag.Bool("debug", false, "Display debug information") var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors") var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
@ -68,7 +69,7 @@ var nohttp = flag.Bool("nohttp", false, "Disable HTTP parsing")
var output = flag.String("output", "", "Path to create file for HTTP 200 OK responses") var output = flag.String("output", "", "Path to create file for HTTP 200 OK responses")
var writeincomplete = flag.Bool("writeincomplete", false, "Write incomplete response") var writeincomplete = flag.Bool("writeincomplete", false, "Write incomplete response")
var hexdump = flag.Bool("dump", false, "Dump HTTP request/response as hex") // global var hexdump = flag.Bool("dump", false, "Dump HTTP request/response as hex") // global
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex") var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
// capture // capture
@ -87,7 +88,7 @@ var dumpToHar = flag.Bool("hardump", false, "Dump traffic to har files")
var HarOutputDir = flag.String("hardir", "", "Directory in which to store output har files") var HarOutputDir = flag.String("hardir", "", "Directory in which to store output har files")
var harEntriesPerFile = flag.Int("harentriesperfile", 200, "Number of max number of har entries to store in each file") var harEntriesPerFile = flag.Int("harentriesperfile", 200, "Number of max number of har entries to store in each file")
var reqResMatcher = createResponseRequestMatcher() // global var reqResMatcher = createResponseRequestMatcher() // global
var statsTracker = StatsTracker{} var statsTracker = StatsTracker{}
// global // global
@ -117,8 +118,8 @@ var outputLevel int
var errorsMap map[string]uint var errorsMap map[string]uint
var errorsMapMutex sync.Mutex var errorsMapMutex sync.Mutex
var nErrors uint var nErrors uint
var ownIps []string // global var ownIps []string // global
var hostMode bool // global var hostMode bool // global
/* minOutputLevel: Error will be printed only if outputLevel is above this value /* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors) * t: key for errorsMap (counting errors)
@ -196,6 +197,37 @@ func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *Outbo
return nil, outboundLinkWriter.OutChan return nil, outboundLinkWriter.OutChan
} }
func startMemoryProfiler() {
dirname := "/app/pprof"
rlog.Info("Profiling is on, results will be written to %s", dirname)
go func() {
if _, err := os.Stat(dirname); os.IsNotExist(err) {
if err := os.Mkdir(dirname, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err)
}
}
for true {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
rlog.Info("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Minute)
}
}()
}
func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) { func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
@ -310,13 +342,19 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
defragger := ip4defrag.NewIPv4Defragmenter() defragger := ip4defrag.NewIPv4Defragmenter()
streamFactory := &tcpStreamFactory{ streamFactory := &tcpStreamFactory{
doHTTP: !*nohttp, doHTTP: !*nohttp,
harWriter: harWriter, harWriter: harWriter,
outbountLinkWriter: outboundLinkWriter, outbountLinkWriter: outboundLinkWriter,
} }
streamPool := reassembly.NewStreamPool(streamFactory) streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool) assembler := reassembly.NewAssembler(streamPool)
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
rlog.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
var assemblerMutex sync.Mutex var assemblerMutex sync.Mutex
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
@ -324,10 +362,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds) staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
cleaner := Cleaner{ cleaner := Cleaner{
assembler: assembler, assembler: assembler,
assemblerMutex: &assemblerMutex, assemblerMutex: &assemblerMutex,
matcher: &reqResMatcher, matcher: &reqResMatcher,
cleanPeriod: cleanPeriod, cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout, connectionTimeout: staleConnectionTimeout,
} }
cleaner.start() cleaner.start()
@ -376,6 +414,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
} }
}() }()
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
}
for packet := range source.Packets() { for packet := range source.Packets() {
count++ count++
rlog.Debugf("PACKET #%d", count) rlog.Debugf("PACKET #%d", count)

View File

@ -1,5 +1,18 @@
package tap package tap
import (
"os"
"strconv"
)
const (
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
)
type globalSettings struct { type globalSettings struct {
filterPorts []int filterPorts []int
filterAuthorities []string filterAuthorities []string
@ -29,3 +42,23 @@ func GetFilterIPs() []string {
copy(addresses, gSettings.filterAuthorities) copy(addresses, gSettings.filterAuthorities)
return addresses return addresses
} }
func GetMaxBufferedPagesTotal() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesTotalEnvVarName))
if err != nil {
return MaxBufferedPagesTotalDefaultValue
}
return valueFromEnv
}
func GetMaxBufferedPagesPerConnection() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesPerConnectionEnvVarName))
if err != nil {
return MaxBufferedPagesPerConnectionDefaultValue
}
return valueFromEnv
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}