Compare commits

...

2 Commits

Author SHA1 Message Date
M. Mert Yıldıran
65bb262652 Fix the OOMKilled error by calling debug.FreeOSMemory periodically (#277)
* Fix the OOMKilled error by calling `debug.FreeOSMemory` periodically

* Remove `MAX_NUMBER_OF_GOROUTINES` environment variable

* Change the line

* Increase the default value of `TCP_STREAM_CHANNEL_TIMEOUT_MS` to `10000`
2021-09-19 12:31:09 +03:00
RoyUP9
842d95c836 fixed redact and regex masking tests (#284) 2021-09-19 11:52:11 +03:00
4 changed files with 44 additions and 34 deletions

View File

@@ -502,10 +502,19 @@ func TestTapRedact(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
headers := entryRequest["headers"].([]interface{})
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
headers := entryDetails["headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
@@ -518,8 +527,8 @@ func TestTapRedact(t *testing.T) {
}
}
data := entryRequest["postData"].(map[string]interface{})
textDataStr := data["text"].(string)
postData := entryDetails["postData"].(map[string]interface{})
textDataStr := postData["text"].(string)
var textData map[string]string
if parseErr := json.Unmarshal([]byte(textDataStr), &textData); parseErr != nil {
@@ -608,10 +617,19 @@ func TestTapNoRedact(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
headers := entryRequest["headers"].([]interface{})
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
headers := entryDetails["headers"].([]interface{})
for _, headerInterface := range headers {
header := headerInterface.(map[string]interface{})
if header["name"].(string) != "User-Agent" {
@@ -624,8 +642,8 @@ func TestTapNoRedact(t *testing.T) {
}
}
data := entryRequest["postData"].(map[string]interface{})
textDataStr := data["text"].(string)
postData := entryDetails["postData"].(map[string]interface{})
textDataStr := postData["text"].(string)
var textData map[string]string
if parseErr := json.Unmarshal([]byte(textDataStr), &textData); parseErr != nil {
@@ -714,11 +732,20 @@ func TestTapRegexMasking(t *testing.T) {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
entry := requestResult.(map[string]interface{})["entry"].(map[string]interface{})
entryRequest := entry["request"].(map[string]interface{})
data := requestResult.(map[string]interface{})["data"].(map[string]interface{})
entryJson := data["entry"].(string)
data := entryRequest["postData"].(map[string]interface{})
textData := data["text"].(string)
var entry map[string]interface{}
if parseErr := json.Unmarshal([]byte(entryJson), &entry); parseErr != nil {
return fmt.Errorf("failed to parse entry, err: %v", parseErr)
}
entryRequest := entry["request"].(map[string]interface{})
entryPayload := entryRequest["payload"].(map[string]interface{})
entryDetails := entryPayload["details"].(map[string]interface{})
postData := entryDetails["postData"].(map[string]interface{})
textData := postData["text"].(string)
if textData != "[REDACTED]" {
return fmt.Errorf("unexpected result - body is not redacted")

View File

@@ -18,6 +18,7 @@ import (
"os"
"os/signal"
"runtime"
_debug "runtime/debug"
"runtime/pprof"
"strconv"
"strings"
@@ -217,10 +218,10 @@ func startMemoryProfiler() {
}
func closeTimedoutTcpStreamChannels() {
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
_debug.FreeOSMemory()
streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream

View File

@@ -13,11 +13,9 @@ const (
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
TcpStreamChannelTimeoutMsDefaultValue = 5000
MaxNumberOfGoroutinesDefaultValue = 4000
TcpStreamChannelTimeoutMsDefaultValue = 10000
)
type globalSettings struct {
@@ -62,14 +60,6 @@ func GetTcpChannelTimeoutMs() time.Duration {
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetMaxNumberOfGoroutines() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
if err != nil {
return MaxNumberOfGoroutinesDefaultValue
}
return valueFromEnv
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}

View File

@@ -2,7 +2,6 @@ package tap
import (
"fmt"
"runtime"
"sync"
"time"
@@ -33,8 +32,6 @@ type tcpStreamWrapper struct {
var streams *sync.Map = &sync.Map{} // global
var streamId int64 = 0
var maxNumberOfGoroutines int
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
rlog.Debugf("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
@@ -61,11 +58,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
superIdentifier: &api.SuperIdentifier{},
}
if stream.isTapTarget {
if runtime.NumGoroutine() > maxNumberOfGoroutines {
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine())
return stream
}
streamId++
stream.id = streamId
for i, extension := range extensions {