diff --git a/tap/api/settings.go b/tap/api/settings.go new file mode 100644 index 000000000..4a00142db --- /dev/null +++ b/tap/api/settings.go @@ -0,0 +1,49 @@ +package api + +import ( + "os" + "strconv" + "time" + + "github.com/up9inc/mizu/shared/logger" +) + +const ( + TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS" + TcpStreamChannelTimeoutMsDefaultValue = 10000 + CloseTimedoutTcpChannelsIntervalMsEnvVarName = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS" + CloseTimedoutTcpChannelsIntervalMsDefaultValue = 1000 + CloseTimedoutTcpChannelsIntervalMsMinValue = 10 + CloseTimedoutTcpChannelsIntervalMsMaxValue = 10000 +) + +func GetTcpChannelTimeoutMs() time.Duration { + valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName)) + if err != nil { + return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond + } + return time.Duration(valueFromEnv) * time.Millisecond +} + +func GetCloseTimedoutTcpChannelsInterval() time.Duration { + defaultDuration := CloseTimedoutTcpChannelsIntervalMsDefaultValue * time.Millisecond + rangeMin := CloseTimedoutTcpChannelsIntervalMsMinValue + rangeMax := CloseTimedoutTcpChannelsIntervalMsMaxValue + closeTimedoutTcpChannelsIntervalMsStr := os.Getenv(CloseTimedoutTcpChannelsIntervalMsEnvVarName) + if closeTimedoutTcpChannelsIntervalMsStr == "" { + return defaultDuration + } else { + closeTimedoutTcpChannelsIntervalMs, err := strconv.Atoi(closeTimedoutTcpChannelsIntervalMsStr) + if err != nil { + logger.Log.Warningf("Error parsing environment variable %s: %v\n", CloseTimedoutTcpChannelsIntervalMsEnvVarName, err) + return defaultDuration + } else { + if closeTimedoutTcpChannelsIntervalMs < rangeMin || closeTimedoutTcpChannelsIntervalMs > rangeMax { + logger.Log.Warningf("The value of environment variable %s is not in acceptable range: %d - %d\n", CloseTimedoutTcpChannelsIntervalMsEnvVarName, rangeMin, rangeMax) + return defaultDuration + } else { + return time.Duration(closeTimedoutTcpChannelsIntervalMs) * time.Millisecond + } + } + } +} diff --git a/tap/api/tcp_streams_map.go b/tap/api/tcp_streams_map.go index 1c76422bc..d2297e9ab 100644 --- a/tap/api/tcp_streams_map.go +++ b/tap/api/tcp_streams_map.go @@ -1,7 +1,12 @@ package api import ( + "runtime" "sync" + "time" + + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/api/diagnose" ) type TcpStreamMap struct { @@ -27,3 +32,27 @@ func (streamMap *TcpStreamMap) NextId() int64 { streamMap.streamId++ return streamMap.streamId } + +func (streamMap *TcpStreamMap) CloseTimedoutTcpStreamChannels() { + tcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs() + closeTimedoutTcpChannelsIntervalMs := GetCloseTimedoutTcpChannelsInterval() + logger.Log.Infof("Using %d ms as the close timedout TCP stream channels interval", closeTimedoutTcpChannelsIntervalMs/time.Millisecond) + + ticker := time.NewTicker(closeTimedoutTcpChannelsIntervalMs) + for { + <-ticker.C + + streamMap.Streams.Range(func(key interface{}, value interface{}) bool { + stream := value.(*TcpStream) + if stream.ProtoIdentifier.Protocol == nil { + if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) { + stream.Close() + diagnose.AppStatsInst.IncDroppedTcpStreams() + logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d", + diagnose.AppStatsInst.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeoutMs/time.Millisecond) + } + } + return true + }) + } +} diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index b178bf32a..a1a3c5de6 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -197,6 +197,8 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI } func startPassiveTapper(streamsMap *api.TcpStreamMap, assembler *tcpAssembler) { + go streamsMap.CloseTimedoutTcpStreamChannels() + diagnose.AppStatsInst.SetStartTime(time.Now()) staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds) diff --git a/tap/settings.go b/tap/settings.go index 0a8c573f1..521a267aa 100644 --- a/tap/settings.go +++ b/tap/settings.go @@ -3,7 +3,6 @@ package tap import ( "os" "strconv" - "time" ) const ( @@ -12,11 +11,8 @@ const ( MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL" MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL" MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION" - TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS" - CloseTimedoutTcpChannelsIntervalMsEnvVar = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS" MaxBufferedPagesTotalDefaultValue = 5000 MaxBufferedPagesPerConnectionDefaultValue = 5000 - TcpStreamChannelTimeoutMsDefaultValue = 10000 ) func GetMaxBufferedPagesTotal() int { @@ -35,14 +31,6 @@ func GetMaxBufferedPagesPerConnection() int { return valueFromEnv } -func GetTcpChannelTimeoutMs() time.Duration { - valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName)) - if err != nil { - return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond - } - return time.Duration(valueFromEnv) * time.Millisecond -} - func GetMemoryProfilingEnabled() bool { return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1" }