Bring back CloseTimedoutTcpStreamChannels method

This commit is contained in:
M. Mert Yildiran 2022-04-20 13:41:00 +03:00
parent 75b44f9143
commit 57eb9034d6
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
4 changed files with 80 additions and 12 deletions

49
tap/api/settings.go Normal file
View File

@ -0,0 +1,49 @@
package api
import (
"os"
"strconv"
"time"
"github.com/up9inc/mizu/shared/logger"
)
const (
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
TcpStreamChannelTimeoutMsDefaultValue = 10000
CloseTimedoutTcpChannelsIntervalMsEnvVarName = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS"
CloseTimedoutTcpChannelsIntervalMsDefaultValue = 1000
CloseTimedoutTcpChannelsIntervalMsMinValue = 10
CloseTimedoutTcpChannelsIntervalMsMaxValue = 10000
)
func GetTcpChannelTimeoutMs() time.Duration {
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
if err != nil {
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
}
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetCloseTimedoutTcpChannelsInterval() time.Duration {
defaultDuration := CloseTimedoutTcpChannelsIntervalMsDefaultValue * time.Millisecond
rangeMin := CloseTimedoutTcpChannelsIntervalMsMinValue
rangeMax := CloseTimedoutTcpChannelsIntervalMsMaxValue
closeTimedoutTcpChannelsIntervalMsStr := os.Getenv(CloseTimedoutTcpChannelsIntervalMsEnvVarName)
if closeTimedoutTcpChannelsIntervalMsStr == "" {
return defaultDuration
} else {
closeTimedoutTcpChannelsIntervalMs, err := strconv.Atoi(closeTimedoutTcpChannelsIntervalMsStr)
if err != nil {
logger.Log.Warningf("Error parsing environment variable %s: %v\n", CloseTimedoutTcpChannelsIntervalMsEnvVarName, err)
return defaultDuration
} else {
if closeTimedoutTcpChannelsIntervalMs < rangeMin || closeTimedoutTcpChannelsIntervalMs > rangeMax {
logger.Log.Warningf("The value of environment variable %s is not in acceptable range: %d - %d\n", CloseTimedoutTcpChannelsIntervalMsEnvVarName, rangeMin, rangeMax)
return defaultDuration
} else {
return time.Duration(closeTimedoutTcpChannelsIntervalMs) * time.Millisecond
}
}
}
}

View File

@ -1,7 +1,12 @@
package api
import (
"runtime"
"sync"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api/diagnose"
)
type TcpStreamMap struct {
@ -27,3 +32,27 @@ func (streamMap *TcpStreamMap) NextId() int64 {
streamMap.streamId++
return streamMap.streamId
}
func (streamMap *TcpStreamMap) CloseTimedoutTcpStreamChannels() {
tcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
closeTimedoutTcpChannelsIntervalMs := GetCloseTimedoutTcpChannelsInterval()
logger.Log.Infof("Using %d ms as the close timedout TCP stream channels interval", closeTimedoutTcpChannelsIntervalMs/time.Millisecond)
ticker := time.NewTicker(closeTimedoutTcpChannelsIntervalMs)
for {
<-ticker.C
streamMap.Streams.Range(func(key interface{}, value interface{}) bool {
stream := value.(*TcpStream)
if stream.ProtoIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
stream.Close()
diagnose.AppStatsInst.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
diagnose.AppStatsInst.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeoutMs/time.Millisecond)
}
}
return true
})
}
}

View File

@ -197,6 +197,8 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
}
func startPassiveTapper(streamsMap *api.TcpStreamMap, assembler *tcpAssembler) {
go streamsMap.CloseTimedoutTcpStreamChannels()
diagnose.AppStatsInst.SetStartTime(time.Now())
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)

View File

@ -3,7 +3,6 @@ package tap
import (
"os"
"strconv"
"time"
)
const (
@ -12,11 +11,8 @@ const (
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
CloseTimedoutTcpChannelsIntervalMsEnvVar = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
TcpStreamChannelTimeoutMsDefaultValue = 10000
)
func GetMaxBufferedPagesTotal() int {
@ -35,14 +31,6 @@ func GetMaxBufferedPagesPerConnection() int {
return valueFromEnv
}
func GetTcpChannelTimeoutMs() time.Duration {
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
if err != nil {
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
}
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}