From 1a5378b64b9b5574f8eb7a9996ced9ac932f6b52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Wed, 13 Apr 2022 23:58:05 -0700 Subject: [PATCH] Increase the OOM risk in exchange of less idle CPU usage (#979) * Increase the OOM risk in exchange of less idle CPU usage * Read the interval from an environment variable named `CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS` * Log the `getCloseTimedoutTcpChannelsInterval` return value --- tap/settings.go | 1 + tap/tcp_streams_map.go | 31 +++++++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/tap/settings.go b/tap/settings.go index b9e25da6d..0a8c573f1 100644 --- a/tap/settings.go +++ b/tap/settings.go @@ -13,6 +13,7 @@ const ( MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL" MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION" TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS" + CloseTimedoutTcpChannelsIntervalMsEnvVar = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS" MaxBufferedPagesTotalDefaultValue = 5000 MaxBufferedPagesPerConnectionDefaultValue = 5000 TcpStreamChannelTimeoutMsDefaultValue = 10000 diff --git a/tap/tcp_streams_map.go b/tap/tcp_streams_map.go index 7e4489dd7..9a94f41ce 100644 --- a/tap/tcp_streams_map.go +++ b/tap/tcp_streams_map.go @@ -1,8 +1,10 @@ package tap import ( + "os" "runtime" _debug "runtime/debug" + "strconv" "sync" "time" @@ -34,10 +36,35 @@ func (streamMap *tcpStreamMap) nextId() int64 { return streamMap.streamId } +func (streamMap *tcpStreamMap) getCloseTimedoutTcpChannelsInterval() time.Duration { + defaultDuration := 1000 * time.Millisecond + rangeMin := 10 + rangeMax := 10000 + closeTimedoutTcpChannelsIntervalMsStr := os.Getenv(CloseTimedoutTcpChannelsIntervalMsEnvVar) + if closeTimedoutTcpChannelsIntervalMsStr == "" { + return defaultDuration + } else { + closeTimedoutTcpChannelsIntervalMs, err := strconv.Atoi(closeTimedoutTcpChannelsIntervalMsStr) + if err != nil { + logger.Log.Warningf("Error parsing environment variable %s: %v\n", CloseTimedoutTcpChannelsIntervalMsEnvVar, err) + return defaultDuration + } else { + if closeTimedoutTcpChannelsIntervalMs < rangeMin || closeTimedoutTcpChannelsIntervalMs > rangeMax { + logger.Log.Warningf("The value of environment variable %s is not in acceptable range: %d - %d\n", CloseTimedoutTcpChannelsIntervalMsEnvVar, rangeMin, rangeMax) + return defaultDuration + } else { + return time.Duration(closeTimedoutTcpChannelsIntervalMs) * time.Millisecond + } + } + } +} + func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() { tcpStreamChannelTimeout := GetTcpChannelTimeoutMs() + closeTimedoutTcpChannelsIntervalMs := streamMap.getCloseTimedoutTcpChannelsInterval() + logger.Log.Infof("Using %d ms as the close timedout TCP stream channels interval", closeTimedoutTcpChannelsIntervalMs/time.Millisecond) for { - time.Sleep(10 * time.Millisecond) + time.Sleep(closeTimedoutTcpChannelsIntervalMs) _debug.FreeOSMemory() streamMap.streams.Range(func(key interface{}, value interface{}) bool { streamWrapper := value.(*tcpStreamWrapper) @@ -47,7 +74,7 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() { stream.Close() diagnose.AppStats.IncDroppedTcpStreams() logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d", - diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000) + diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/time.Millisecond) } } else { if !stream.superIdentifier.IsClosedOthers {