Compare commits

..

4 Commits

Author SHA1 Message Date
RoyUP9
6b52458642 removed exit if browser not supported (#270) 2021-09-12 16:09:04 +03:00
M. Mert Yıldıran
858a64687d Stop the hanging Goroutines by dropping the old, unidentified TCP streams (#260)
* Close the hanging TCP message channels after a dynamically aligned timeout (base `10000` milliseconds)

* Bring back `source.Lazy`

* Add a one more `sync.Map.Delete` call

* Improve the formula by taking base Goroutine count into account

* Reduce duplication

* Include the dropped TCP streams count into the stats tracker and print a debug log whenever it happens

* Add `superIdentifier` field to `tcpStream` to check if it has identified

Also stop the other protocol dissectors if a TCP stream identified by a protocol.

* Take one step forward in fixing the channel closing issue (WIP)

Add `sync.Mutex` to `tcpReader` and make the loops reference based.

* Fix the channel closing issue

* Improve the accuracy of the formula, log better and multiply `baseStreamChannelTimeoutMs` by 100

* Remove `fmt.Printf`

* Replace `runtime.Gosched()` with `time.Sleep(1 * time.Millisecond)`

* Close the channels of other protocols in case of an identification

* Simplify the logic

* Replace the formula with hard timeout 5000 milliseconds and 4000 maximum number of Goroutines
2021-09-12 08:26:48 +03:00
M. Mert Yıldıran
819ccf54cd Fix the build error in PR validation (#269)
* Fix the build error in PR validation (temp)

* Set Go version to 1.16

* Revert "Fix the build error in PR validation (temp)"

This reverts commit 4cb613251c.
2021-09-11 21:23:15 +03:00
M. Mert Yıldıran
7cc077c8a0 Fix the memory exhaustion by optimizing max. AMQP message size and GOGC (#257)
* Permanently resolve the memory exhaustion in AMQP

Introduce;
- `MEMORY_PROFILING_DUMP_PATH`
- `MEMORY_PROFILING_TIME_INTERVAL`
environment variables and make `startMemoryProfiler` method more parameterized.

* Fix a leak in HTTP

* Revert "Fix a leak in HTTP"

This reverts commit 9d46820ff3.

* Set maximum AMQP message size to 16MB

* Set `GOGC` to 12800

* Remove some commented out lines and an unnecessary `else if`
2021-09-09 17:45:37 +03:00
18 changed files with 311 additions and 98 deletions

View File

@@ -18,7 +18,7 @@ jobs:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '^1.16'
go-version: '1.16'
- name: Check out code into the Go module directory
uses: actions/checkout@v2
@@ -33,7 +33,7 @@ jobs:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '^1.16'
go-version: '1.16'
- name: Check out code into the Go module directory
uses: actions/checkout@v2

3
.gitignore vendored
View File

@@ -26,3 +26,6 @@ build
# Environment variables
.env
# pprof
pprof/*

View File

@@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
@@ -63,8 +62,8 @@ func openBrowser(url string) {
default:
err = fmt.Errorf("unsupported platform")
}
if err != nil {
log.Fatal(err)
}
if err != nil {
logger.Log.Errorf("error while opening browser, %v", err)
}
}

View File

@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(

View File

@@ -8,4 +8,5 @@ const (
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml"
GoGCEnvVar = "GOGC"
)

View File

@@ -21,7 +21,7 @@ type Protocol struct {
}
type Extension struct {
Protocol Protocol
Protocol *Protocol
Path string
Plug *plugin.Plugin
Dissector Dissector
@@ -72,10 +72,15 @@ type SuperTimer struct {
CaptureTime time.Time
}
type SuperIdentifier struct {
Protocol *Protocol
IsClosedOthers bool
}
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, emitter Emitter) error
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)

View File

@@ -32,7 +32,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = &protocol
}
func (d dissecting) Ping() {
@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
r := AmqpReader{b}
var remaining int
@@ -78,13 +78,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
var lastMethodFrameMessage Message
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return errors.New("AMQP EOF")
} else if err != nil {
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
return err
}
switch f := frame.(type) {
@@ -101,6 +102,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
frame = nil
}
case *BodyFrame:
@@ -110,11 +112,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
body = nil
frame = nil
}
case *MethodFrame:
@@ -134,6 +140,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicConsume:
@@ -146,6 +153,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
@@ -165,6 +173,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ExchangeDeclare:
@@ -178,6 +187,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionStart:
@@ -188,6 +198,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionClose:
@@ -197,9 +208,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ClassId: m.ClassId,
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
frame = nil
}

View File

@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
channel := binary.BigEndian.Uint16(scratch[1:3])
size := binary.BigEndian.Uint32(scratch[3:7])
if size > 1000000 {
if size > 1000000*16 {
return nil, ErrMaxSize
}
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
return
}
if hf.Size > 512 {
return nil, ErrMaxHeaderFrameSize
}
var flags uint16
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
}
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
bf.Body = nil
return nil, err
}

View File

@@ -10,7 +10,6 @@ package main
import (
"encoding/binary"
"fmt"
"io"
)
@@ -19,32 +18,35 @@ import (
// ErrCredentials. The text of the error is likely more interesting than
// these constants.
const (
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
MaxHeaderFrameSizeError = 552
BadMethodFrameUnknownMethod = 601
BadMethodFrameUnknownClass = 602
)
func isSoftExceptionCode(code int) bool {
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 20: // channel
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 40: // exchange
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 50: // queue
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 60: // basic
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 90: // tx
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 85: // confirm
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
default:
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
return nil, ErrBadMethodFrameUnknownClass
}
return mf, nil

View File

@@ -60,8 +60,13 @@ var (
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
// ErrClosed is returned when the channel or connection is not open
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
)
// Error captures the code and reason a channel or connection has been closed

View File

@@ -3,6 +3,7 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@@ -52,7 +53,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
@@ -60,7 +61,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -77,8 +78,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
grpcAssembler = createGrpcAssembler(b)
}
success := false
dissected := false
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -87,7 +92,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -96,7 +101,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -105,13 +110,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
}
}
if !success {
if !dissected {
return err
}
superIdentifier.Protocol = &protocol
return nil
}

View File

@@ -3,6 +3,7 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"log"
"time"
@@ -30,7 +31,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = _protocol
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
@@ -38,18 +39,24 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID, superTimer)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, superTimer, emitter)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
}
}
}

View File

@@ -13,11 +13,13 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
@@ -94,6 +96,8 @@ var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
const baseStreamChannelTimeoutMs int = 5000 * 100
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
@@ -168,11 +172,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
}
func startMemoryProfiler() {
dirname := "/app/pprof"
rlog.Info("Profiling is on, results will be written to %s", dirname)
dumpPath := "/app/pprof"
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dirname); os.IsNotExist(err) {
if err := os.Mkdir(dirname, 0777); err != nil {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err)
}
}
@@ -180,9 +196,9 @@ func startMemoryProfiler() {
for true {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
rlog.Info("Writing memory profile to %s\n", filename)
rlog.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
@@ -193,13 +209,50 @@ func startMemoryProfiler() {
log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Minute)
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}
func closeTimedoutTcpStreamChannels() {
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
for i := range stream.clients {
reader := &stream.clients[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
for i := range stream.servers {
reader := &stream.servers[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
stream.superIdentifier.IsClosedOthers = true
}
}
return true
})
}
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
go closeTimedoutTcpStreamChannels()
defer util.Run()()
if *debug {
@@ -354,7 +407,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
startMemoryProfiler()
}
for packet := range source.Packets() {
for {
packet, err := source.NextPacket()
if err == io.EOF {
break
} else if err != nil {
rlog.Debugf("Error:", err)
continue
}
packetsCount := statsTracker.incPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data()

View File

@@ -3,14 +3,21 @@ package tap
import (
"os"
"strconv"
"time"
)
const (
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
TcpStreamChannelTimeoutMsDefaultValue = 5000
MaxNumberOfGoroutinesDefaultValue = 4000
)
type globalSettings struct {
@@ -47,6 +54,22 @@ func GetMaxBufferedPagesPerConnection() int {
return valueFromEnv
}
func GetTcpChannelTimeoutMs() time.Duration {
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
if err != nil {
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
}
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetMaxNumberOfGoroutines() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
if err != nil {
return MaxNumberOfGoroutinesDefaultValue
}
return valueFromEnv
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}

View File

@@ -13,6 +13,7 @@ type AppStats struct {
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
}
type StatsTracker struct {
@@ -23,6 +24,7 @@ type StatsTracker struct {
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
droppedTcpStreamsMutex sync.Mutex
}
func (st *StatsTracker) incMatchedPairs() {
@@ -31,6 +33,12 @@ func (st *StatsTracker) incMatchedPairs() {
st.matchedPairsMutex.Unlock()
}
func (st *StatsTracker) incDroppedTcpStreams() {
st.droppedTcpStreamsMutex.Lock()
st.appStats.DroppedTcpStreams++
st.droppedTcpStreamsMutex.Unlock()
}
func (st *StatsTracker) incPacketsCount() int64 {
st.packetsCountMutex.Lock()
st.appStats.PacketsCount++
@@ -100,5 +108,10 @@ func (st *StatsTracker) dumpStats() *AppStats {
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
st.droppedTcpStreamsMutex.Lock()
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
st.appStats.DroppedTcpStreams = 0
st.droppedTcpStreamsMutex.Unlock()
return currentAppStats
}

View File

@@ -47,6 +47,7 @@ func (tid *tcpID) String() string {
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
@@ -59,6 +60,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
sync.Mutex
}
func (h *tcpReader) Read(p []byte) (int, error) {
@@ -93,10 +95,19 @@ func (h *tcpReader) Read(p []byte) (int, error) {
return l, nil
}
func (h *tcpReader) Close() {
h.Lock()
if !h.isClosed {
h.isClosed = true
close(h.msgQueue)
}
h.Unlock()
}
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.emitter)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
if err != nil {
io.Copy(ioutil.Discard, b)
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
)
/* It's a connection (bidirectional)
@@ -16,16 +17,19 @@ import (
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
*/
type tcpStream struct {
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
urls []string
ident string
id int64
isClosed bool
superIdentifier *api.SuperIdentifier
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
urls []string
ident string
sync.Mutex
}
@@ -146,12 +150,22 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
statsTracker.incReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for _, reader := range t.clients {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
for i := range t.clients {
reader := &t.clients[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
} else {
for _, reader := range t.servers {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
for i := range t.servers {
reader := &t.servers[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
}
}
@@ -160,14 +174,33 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isTapTarget {
for _, reader := range t.clients {
close(reader.msgQueue)
}
for _, reader := range t.servers {
close(reader.msgQueue)
}
if t.isTapTarget && !t.isClosed {
t.Close()
}
// do not remove the connection to allow last ACK
return false
}
func (t *tcpStream) Close() {
shouldReturn := false
t.Lock()
if t.isClosed {
shouldReturn = true
} else {
t.isClosed = true
}
t.Unlock()
if shouldReturn {
return
}
streams.Delete(t.id)
for i := range t.clients {
reader := &t.clients[i]
reader.Close()
}
for i := range t.servers {
reader := &t.servers[i]
reader.Close()
}
}

View File

@@ -2,7 +2,9 @@ package tap
import (
"fmt"
"runtime"
"sync"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
@@ -23,6 +25,16 @@ type tcpStreamFactory struct {
Emitter api.Emitter
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
}
var streams *sync.Map = &sync.Map{} // global
var streamId int64 = 0
var maxNumberOfGoroutines int
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
rlog.Debugf("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
@@ -39,15 +51,23 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
isTapTarget := props.isTapTarget
stream := &tcpStream{
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
superIdentifier: &api.SuperIdentifier{},
}
if stream.isTapTarget {
if runtime.NumGoroutine() > maxNumberOfGoroutines {
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
return stream
}
streamId++
stream.id = streamId
for i, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
@@ -89,6 +109,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
emitter: factory.Emitter,
counterPair: counterPair,
})
streams.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
})
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.clients[i].run(&factory.wg)
@@ -119,7 +145,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
}
return &streamProps{isTapTarget: false, isOutgoing: false}
} else {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
return &streamProps{isTapTarget: true}
}
}