diff --git a/tap/cleaner.go b/tap/cleaner.go index 157a74eb0..96972fc9e 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -1,6 +1,7 @@ package tap import ( + "github.com/romana/rlog" "sync" "time" @@ -20,19 +21,21 @@ type Cleaner struct { cleanPeriod time.Duration connectionTimeout time.Duration stats CleanerStats - statsMutex sync.Mutex + statsMutex sync.Mutex } func (cl *Cleaner) clean() { startCleanTime := time.Now() cl.assemblerMutex.Lock() + rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump()) flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) cl.assemblerMutex.Unlock() deleted := cl.matcher.deleteOlderThan(startCleanTime.Add(-cl.connectionTimeout)) cl.statsMutex.Lock() + rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) cl.stats.flushed += flushed cl.stats.closed += closed cl.stats.deleted += deleted @@ -55,7 +58,7 @@ func (cl *Cleaner) dumpStats() CleanerStats { stats := CleanerStats{ flushed: cl.stats.flushed, - closed : cl.stats.closed, + closed: cl.stats.closed, deleted: cl.stats.deleted, } diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index c1eef2c75..cc58be86a 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -35,7 +35,8 @@ const AppPortsEnvVar = "APP_PORTS" const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT" const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB const cleanPeriod = time.Second * 10 -var remoteOnlyOutboundPorts = []int { 80, 443 } + +var remoteOnlyOutboundPorts = []int{80, 443} func parseAppPorts(appPortsList string) []int { ports := make([]int, 0) @@ -55,10 +56,10 @@ var decoder = flag.String("decoder", "", "Name of the decoder to use (default: g var statsevery = flag.Int("stats", 60, "Output statistics every N seconds") var lazy = flag.Bool("lazy", false, "If true, do lazy decoding") var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag") -var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global -var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global -var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global -var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global +var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global +var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global +var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global +var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global var verbose = flag.Bool("verbose", false, "Be verbose") var debug = flag.Bool("debug", false, "Display debug information") var quiet = flag.Bool("quiet", false, "Be quiet regarding errors") @@ -68,7 +69,7 @@ var nohttp = flag.Bool("nohttp", false, "Disable HTTP parsing") var output = flag.String("output", "", "Path to create file for HTTP 200 OK responses") var writeincomplete = flag.Bool("writeincomplete", false, "Write incomplete response") -var hexdump = flag.Bool("dump", false, "Dump HTTP request/response as hex") // global +var hexdump = flag.Bool("dump", false, "Dump HTTP request/response as hex") // global var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex") // capture @@ -87,7 +88,7 @@ var dumpToHar = flag.Bool("hardump", false, "Dump traffic to har files") var HarOutputDir = flag.String("hardir", "", "Directory in which to store output har files") var harEntriesPerFile = flag.Int("harentriesperfile", 200, "Number of max number of har entries to store in each file") -var reqResMatcher = createResponseRequestMatcher() // global +var reqResMatcher = createResponseRequestMatcher() // global var statsTracker = StatsTracker{} // global @@ -117,8 +118,8 @@ var outputLevel int var errorsMap map[string]uint var errorsMapMutex sync.Mutex var nErrors uint -var ownIps []string // global -var hostMode bool // global +var ownIps []string // global +var hostMode bool // global /* minOutputLevel: Error will be printed only if outputLevel is above this value * t: key for errorsMap (counting errors) @@ -196,6 +197,37 @@ func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *Outbo return nil, outboundLinkWriter.OutChan } +func startMemoryProfiler() { + dirname := "/app/pprof" + rlog.Info("Profiling is on, results will be written to %s", dirname) + go func() { + if _, err := os.Stat(dirname); os.IsNotExist(err) { + if err := os.Mkdir(dirname, 0777); err != nil { + log.Fatal("could not create directory for profile: ", err) + } + } + + for true { + t := time.Now() + + filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05")) + + rlog.Info("Writing memory profile to %s\n", filename) + + f, err := os.Create(filename) + if err != nil { + log.Fatal("could not create memory profile: ", err) + } + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal("could not write memory profile: ", err) + } + _ = f.Close() + time.Sleep(time.Minute) + } + }() +} + func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) { log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) @@ -310,13 +342,19 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr defragger := ip4defrag.NewIPv4Defragmenter() streamFactory := &tcpStreamFactory{ - doHTTP: !*nohttp, - harWriter: harWriter, + doHTTP: !*nohttp, + harWriter: harWriter, outbountLinkWriter: outboundLinkWriter, - } streamPool := reassembly.NewStreamPool(streamFactory) assembler := reassembly.NewAssembler(streamPool) + + maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection() + maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal() + rlog.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection) + assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal + assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection + var assemblerMutex sync.Mutex signalChan := make(chan os.Signal, 1) @@ -324,10 +362,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds) cleaner := Cleaner{ - assembler: assembler, - assemblerMutex: &assemblerMutex, - matcher: &reqResMatcher, - cleanPeriod: cleanPeriod, + assembler: assembler, + assemblerMutex: &assemblerMutex, + matcher: &reqResMatcher, + cleanPeriod: cleanPeriod, connectionTimeout: staleConnectionTimeout, } cleaner.start() @@ -376,6 +414,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr } }() + if GetMemoryProfilingEnabled() { + startMemoryProfiler() + } + for packet := range source.Packets() { count++ rlog.Debugf("PACKET #%d", count) diff --git a/tap/settings.go b/tap/settings.go index cf89dd345..7c8636239 100644 --- a/tap/settings.go +++ b/tap/settings.go @@ -1,5 +1,18 @@ package tap +import ( + "os" + "strconv" +) + +const ( + MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED" + MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL" + MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION" + MaxBufferedPagesTotalDefaultValue = 5000 + MaxBufferedPagesPerConnectionDefaultValue = 5000 +) + type globalSettings struct { filterPorts []int filterAuthorities []string @@ -29,3 +42,23 @@ func GetFilterIPs() []string { copy(addresses, gSettings.filterAuthorities) return addresses } + +func GetMaxBufferedPagesTotal() int { + valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesTotalEnvVarName)) + if err != nil { + return MaxBufferedPagesTotalDefaultValue + } + return valueFromEnv +} + +func GetMaxBufferedPagesPerConnection() int { + valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesPerConnectionEnvVarName)) + if err != nil { + return MaxBufferedPagesPerConnectionDefaultValue + } + return valueFromEnv +} + +func GetMemoryProfilingEnabled() bool { + return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1" +}