mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-27 00:52:58 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f880417e9 | ||
|
|
6b52458642 | ||
|
|
858a64687d | ||
|
|
819ccf54cd | ||
|
|
7cc077c8a0 |
76
.github/CODE_OF_CONDUCT.md
vendored
Normal file
76
.github/CODE_OF_CONDUCT.md
vendored
Normal file
@@ -0,0 +1,76 @@
|
||||
# Contributor Covenant Code of Conduct
|
||||
|
||||
## Our Pledge
|
||||
|
||||
In the interest of fostering an open and welcoming environment, we as
|
||||
contributors and maintainers pledge to making participation in our project and
|
||||
our community a harassment-free experience for everyone, regardless of age, body
|
||||
size, disability, ethnicity, sex characteristics, gender identity and expression,
|
||||
level of experience, education, socio-economic status, nationality, personal
|
||||
appearance, race, religion, or sexual identity and orientation.
|
||||
|
||||
## Our Standards
|
||||
|
||||
Examples of behavior that contributes to creating a positive environment
|
||||
include:
|
||||
|
||||
* Using welcoming and inclusive language
|
||||
* Being respectful of differing viewpoints and experiences
|
||||
* Gracefully accepting constructive criticism
|
||||
* Focusing on what is best for the community
|
||||
* Showing empathy towards other community members
|
||||
|
||||
Examples of unacceptable behavior by participants include:
|
||||
|
||||
* The use of sexualized language or imagery and unwelcome sexual attention or
|
||||
advances
|
||||
* Trolling, insulting/derogatory comments, and personal or political attacks
|
||||
* Public or private harassment
|
||||
* Publishing others' private information, such as a physical or electronic
|
||||
address, without explicit permission
|
||||
* Other conduct which could reasonably be considered inappropriate in a
|
||||
professional setting
|
||||
|
||||
## Our Responsibilities
|
||||
|
||||
Project maintainers are responsible for clarifying the standards of acceptable
|
||||
behavior and are expected to take appropriate and fair corrective action in
|
||||
response to any instances of unacceptable behavior.
|
||||
|
||||
Project maintainers have the right and responsibility to remove, edit, or
|
||||
reject comments, commits, code, wiki edits, issues, and other contributions
|
||||
that are not aligned to this Code of Conduct, or to ban temporarily or
|
||||
permanently any contributor for other behaviors that they deem inappropriate,
|
||||
threatening, offensive, or harmful.
|
||||
|
||||
## Scope
|
||||
|
||||
This Code of Conduct applies both within project spaces and in public spaces
|
||||
when an individual is representing the project or its community. Examples of
|
||||
representing a project or community include using an official project e-mail
|
||||
address, posting via an official social media account, or acting as an appointed
|
||||
representative at an online or offline event. Representation of a project may be
|
||||
further defined and clarified by project maintainers.
|
||||
|
||||
## Enforcement
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be
|
||||
reported by contacting the project team at mizu@up9.com. All
|
||||
complaints will be reviewed and investigated and will result in a response that
|
||||
is deemed necessary and appropriate to the circumstances. The project team is
|
||||
obligated to maintain confidentiality with regard to the reporter of an incident.
|
||||
Further details of specific enforcement policies may be posted separately.
|
||||
|
||||
Project maintainers who do not follow or enforce the Code of Conduct in good
|
||||
faith may face temporary or permanent repercussions as determined by other
|
||||
members of the project's leadership.
|
||||
|
||||
## Attribution
|
||||
|
||||
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
|
||||
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
|
||||
|
||||
[homepage]: https://www.contributor-covenant.org
|
||||
|
||||
For answers to common questions about this code of conduct, see
|
||||
https://www.contributor-covenant.org/faq
|
||||
4
.github/workflows/pr_validation.yml
vendored
4
.github/workflows/pr_validation.yml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
- name: Set up Go 1.16
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '1.16'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
@@ -33,7 +33,7 @@ jobs:
|
||||
- name: Set up Go 1.16
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '1.16'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -26,3 +26,6 @@ build
|
||||
|
||||
# Environment variables
|
||||
.env
|
||||
|
||||
# pprof
|
||||
pprof/*
|
||||
|
||||
@@ -3,7 +3,6 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
@@ -63,8 +62,8 @@ func openBrowser(url string) {
|
||||
default:
|
||||
err = fmt.Errorf("unsupported platform")
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error while opening browser, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
|
||||
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
|
||||
)
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
|
||||
|
||||
@@ -8,4 +8,5 @@ const (
|
||||
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
|
||||
RulePolicyPath = "/app/enforce-policy/"
|
||||
RulePolicyFileName = "enforce-policy.yaml"
|
||||
GoGCEnvVar = "GOGC"
|
||||
)
|
||||
|
||||
@@ -21,7 +21,7 @@ type Protocol struct {
|
||||
}
|
||||
|
||||
type Extension struct {
|
||||
Protocol Protocol
|
||||
Protocol *Protocol
|
||||
Path string
|
||||
Plug *plugin.Plugin
|
||||
Dissector Dissector
|
||||
@@ -72,10 +72,15 @@ type SuperTimer struct {
|
||||
CaptureTime time.Time
|
||||
}
|
||||
|
||||
type SuperIdentifier struct {
|
||||
Protocol *Protocol
|
||||
IsClosedOthers bool
|
||||
}
|
||||
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, emitter Emitter) error
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
|
||||
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
|
||||
Summarize(entry *MizuEntry) *BaseEntryDetails
|
||||
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
|
||||
|
||||
@@ -32,7 +32,7 @@ func init() {
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = protocol
|
||||
extension.Protocol = &protocol
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
|
||||
|
||||
const amqpRequest string = "amqp_request"
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
||||
r := AmqpReader{b}
|
||||
|
||||
var remaining int
|
||||
@@ -78,13 +78,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
var lastMethodFrameMessage Message
|
||||
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
frame, err := r.ReadFrame()
|
||||
if err == io.EOF {
|
||||
// We must read until we see an EOF... very important!
|
||||
return errors.New("AMQP EOF")
|
||||
} else if err != nil {
|
||||
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
|
||||
return err
|
||||
}
|
||||
|
||||
switch f := frame.(type) {
|
||||
@@ -101,6 +102,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.Properties = header.Properties
|
||||
default:
|
||||
frame = nil
|
||||
}
|
||||
|
||||
case *BodyFrame:
|
||||
@@ -110,11 +112,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
switch lastMethodFrameMessage.(type) {
|
||||
case *BasicPublish:
|
||||
eventBasicPublish.Body = f.Body
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.Body = f.Body
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
default:
|
||||
body = nil
|
||||
frame = nil
|
||||
}
|
||||
|
||||
case *MethodFrame:
|
||||
@@ -134,6 +140,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *BasicConsume:
|
||||
@@ -146,6 +153,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *BasicDeliver:
|
||||
@@ -165,6 +173,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *ExchangeDeclare:
|
||||
@@ -178,6 +187,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *ConnectionStart:
|
||||
@@ -188,6 +198,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
Mechanisms: m.Mechanisms,
|
||||
Locales: m.Locales,
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *ConnectionClose:
|
||||
@@ -197,9 +208,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
ClassId: m.ClassId,
|
||||
MethodId: m.MethodId,
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
default:
|
||||
frame = nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
|
||||
channel := binary.BigEndian.Uint16(scratch[1:3])
|
||||
size := binary.BigEndian.Uint32(scratch[3:7])
|
||||
|
||||
if size > 1000000 {
|
||||
if size > 1000000*16 {
|
||||
return nil, ErrMaxSize
|
||||
}
|
||||
|
||||
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
|
||||
return
|
||||
}
|
||||
|
||||
if hf.Size > 512 {
|
||||
return nil, ErrMaxHeaderFrameSize
|
||||
}
|
||||
|
||||
var flags uint16
|
||||
|
||||
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
|
||||
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
|
||||
}
|
||||
|
||||
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
|
||||
bf.Body = nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
@@ -19,32 +18,35 @@ import (
|
||||
// ErrCredentials. The text of the error is likely more interesting than
|
||||
// these constants.
|
||||
const (
|
||||
frameMethod = 1
|
||||
frameHeader = 2
|
||||
frameBody = 3
|
||||
frameHeartbeat = 8
|
||||
frameMinSize = 4096
|
||||
frameEnd = 206
|
||||
replySuccess = 200
|
||||
ContentTooLarge = 311
|
||||
NoRoute = 312
|
||||
NoConsumers = 313
|
||||
ConnectionForced = 320
|
||||
InvalidPath = 402
|
||||
AccessRefused = 403
|
||||
NotFound = 404
|
||||
ResourceLocked = 405
|
||||
PreconditionFailed = 406
|
||||
FrameError = 501
|
||||
SyntaxError = 502
|
||||
CommandInvalid = 503
|
||||
ChannelError = 504
|
||||
UnexpectedFrame = 505
|
||||
ResourceError = 506
|
||||
NotAllowed = 530
|
||||
NotImplemented = 540
|
||||
InternalError = 541
|
||||
MaxSizeError = 551
|
||||
frameMethod = 1
|
||||
frameHeader = 2
|
||||
frameBody = 3
|
||||
frameHeartbeat = 8
|
||||
frameMinSize = 4096
|
||||
frameEnd = 206
|
||||
replySuccess = 200
|
||||
ContentTooLarge = 311
|
||||
NoRoute = 312
|
||||
NoConsumers = 313
|
||||
ConnectionForced = 320
|
||||
InvalidPath = 402
|
||||
AccessRefused = 403
|
||||
NotFound = 404
|
||||
ResourceLocked = 405
|
||||
PreconditionFailed = 406
|
||||
FrameError = 501
|
||||
SyntaxError = 502
|
||||
CommandInvalid = 503
|
||||
ChannelError = 504
|
||||
UnexpectedFrame = 505
|
||||
ResourceError = 506
|
||||
NotAllowed = 530
|
||||
NotImplemented = 540
|
||||
InternalError = 541
|
||||
MaxSizeError = 551
|
||||
MaxHeaderFrameSizeError = 552
|
||||
BadMethodFrameUnknownMethod = 601
|
||||
BadMethodFrameUnknownClass = 602
|
||||
)
|
||||
|
||||
func isSoftExceptionCode(code int) bool {
|
||||
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 20: // channel
|
||||
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 40: // exchange
|
||||
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 50: // queue
|
||||
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 60: // basic
|
||||
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 90: // tx
|
||||
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 85: // confirm
|
||||
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownClass
|
||||
}
|
||||
|
||||
return mf, nil
|
||||
|
||||
@@ -60,8 +60,13 @@ var (
|
||||
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
|
||||
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
|
||||
|
||||
// ErrClosed is returned when the channel or connection is not open
|
||||
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
|
||||
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
|
||||
|
||||
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
|
||||
|
||||
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
|
||||
|
||||
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
|
||||
)
|
||||
|
||||
// Error captures the code and reason a channel or connection has been closed
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@@ -52,7 +53,7 @@ func init() {
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = protocol
|
||||
extension.Protocol = &protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
@@ -60,7 +61,7 @@ func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
||||
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
|
||||
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
||||
if err != nil {
|
||||
@@ -77,8 +78,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
grpcAssembler = createGrpcAssembler(b)
|
||||
}
|
||||
|
||||
success := false
|
||||
dissected := false
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
if isHTTP2 {
|
||||
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
@@ -87,7 +92,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
dissected = true
|
||||
} else if isClient {
|
||||
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
@@ -96,7 +101,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
dissected = true
|
||||
} else {
|
||||
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
@@ -105,13 +110,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
dissected = true
|
||||
}
|
||||
}
|
||||
|
||||
if !success {
|
||||
if !dissected {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
@@ -30,7 +31,7 @@ func init() {
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = _protocol
|
||||
extension.Protocol = &_protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
@@ -38,18 +39,24 @@ func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", _protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
if isClient {
|
||||
_, _, err := ReadRequest(b, tcpID, superTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
} else {
|
||||
err := ReadResponse(b, tcpID, superTimer, emitter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,11 +13,13 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -94,6 +96,8 @@ var ownIps []string // global
|
||||
var hostMode bool // global
|
||||
var extensions []*api.Extension // global
|
||||
|
||||
const baseStreamChannelTimeoutMs int = 5000 * 100
|
||||
|
||||
/* minOutputLevel: Error will be printed only if outputLevel is above this value
|
||||
* t: key for errorsMap (counting errors)
|
||||
* s, a: arguments log.Printf
|
||||
@@ -168,11 +172,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
||||
}
|
||||
|
||||
func startMemoryProfiler() {
|
||||
dirname := "/app/pprof"
|
||||
rlog.Info("Profiling is on, results will be written to %s", dirname)
|
||||
dumpPath := "/app/pprof"
|
||||
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
|
||||
if envDumpPath != "" {
|
||||
dumpPath = envDumpPath
|
||||
}
|
||||
timeInterval := 60
|
||||
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
|
||||
if envTimeInterval != "" {
|
||||
if i, err := strconv.Atoi(envTimeInterval); err == nil {
|
||||
timeInterval = i
|
||||
}
|
||||
}
|
||||
|
||||
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
go func() {
|
||||
if _, err := os.Stat(dirname); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dirname, 0777); err != nil {
|
||||
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dumpPath, 0777); err != nil {
|
||||
log.Fatal("could not create directory for profile: ", err)
|
||||
}
|
||||
}
|
||||
@@ -180,9 +196,9 @@ func startMemoryProfiler() {
|
||||
for true {
|
||||
t := time.Now()
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
rlog.Info("Writing memory profile to %s\n", filename)
|
||||
rlog.Infof("Writing memory profile to %s\n", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
@@ -193,13 +209,50 @@ func startMemoryProfiler() {
|
||||
log.Fatal("could not write memory profile: ", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(time.Minute)
|
||||
time.Sleep(time.Second * time.Duration(timeInterval))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func closeTimedoutTcpStreamChannels() {
|
||||
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
|
||||
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
streams.Range(func(key interface{}, value interface{}) bool {
|
||||
streamWrapper := value.(*tcpStreamWrapper)
|
||||
stream := streamWrapper.stream
|
||||
if stream.superIdentifier.Protocol == nil {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
||||
stream.Close()
|
||||
statsTracker.incDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
}
|
||||
} else {
|
||||
if !stream.superIdentifier.IsClosedOthers {
|
||||
for i := range stream.clients {
|
||||
reader := &stream.clients[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
for i := range stream.servers {
|
||||
reader := &stream.servers[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
stream.superIdentifier.IsClosedOthers = true
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
|
||||
go closeTimedoutTcpStreamChannels()
|
||||
|
||||
defer util.Run()()
|
||||
if *debug {
|
||||
@@ -354,7 +407,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
startMemoryProfiler()
|
||||
}
|
||||
|
||||
for packet := range source.Packets() {
|
||||
for {
|
||||
packet, err := source.NextPacket()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
rlog.Debugf("Error:", err)
|
||||
continue
|
||||
}
|
||||
packetsCount := statsTracker.incPacketsCount()
|
||||
rlog.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
|
||||
@@ -3,14 +3,21 @@ package tap
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
|
||||
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
|
||||
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
|
||||
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
|
||||
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
|
||||
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
|
||||
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
|
||||
MaxBufferedPagesTotalDefaultValue = 5000
|
||||
MaxBufferedPagesPerConnectionDefaultValue = 5000
|
||||
TcpStreamChannelTimeoutMsDefaultValue = 5000
|
||||
MaxNumberOfGoroutinesDefaultValue = 4000
|
||||
)
|
||||
|
||||
type globalSettings struct {
|
||||
@@ -47,6 +54,22 @@ func GetMaxBufferedPagesPerConnection() int {
|
||||
return valueFromEnv
|
||||
}
|
||||
|
||||
func GetTcpChannelTimeoutMs() time.Duration {
|
||||
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
|
||||
if err != nil {
|
||||
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
|
||||
}
|
||||
return time.Duration(valueFromEnv) * time.Millisecond
|
||||
}
|
||||
|
||||
func GetMaxNumberOfGoroutines() int {
|
||||
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
|
||||
if err != nil {
|
||||
return MaxNumberOfGoroutinesDefaultValue
|
||||
}
|
||||
return valueFromEnv
|
||||
}
|
||||
|
||||
func GetMemoryProfilingEnabled() bool {
|
||||
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ type AppStats struct {
|
||||
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
|
||||
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
|
||||
MatchedPairs int64 `json:"matchedPairs"`
|
||||
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
|
||||
}
|
||||
|
||||
type StatsTracker struct {
|
||||
@@ -23,6 +24,7 @@ type StatsTracker struct {
|
||||
reassembledTcpPayloadsCountMutex sync.Mutex
|
||||
tlsConnectionsCountMutex sync.Mutex
|
||||
matchedPairsMutex sync.Mutex
|
||||
droppedTcpStreamsMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incMatchedPairs() {
|
||||
@@ -31,6 +33,12 @@ func (st *StatsTracker) incMatchedPairs() {
|
||||
st.matchedPairsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incDroppedTcpStreams() {
|
||||
st.droppedTcpStreamsMutex.Lock()
|
||||
st.appStats.DroppedTcpStreams++
|
||||
st.droppedTcpStreamsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incPacketsCount() int64 {
|
||||
st.packetsCountMutex.Lock()
|
||||
st.appStats.PacketsCount++
|
||||
@@ -100,5 +108,10 @@ func (st *StatsTracker) dumpStats() *AppStats {
|
||||
st.appStats.MatchedPairs = 0
|
||||
st.matchedPairsMutex.Unlock()
|
||||
|
||||
st.droppedTcpStreamsMutex.Lock()
|
||||
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
|
||||
st.appStats.DroppedTcpStreams = 0
|
||||
st.droppedTcpStreamsMutex.Unlock()
|
||||
|
||||
return currentAppStats
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ func (tid *tcpID) String() string {
|
||||
type tcpReader struct {
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
@@ -59,6 +60,7 @@ type tcpReader struct {
|
||||
extension *api.Extension
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
@@ -93,10 +95,19 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (h *tcpReader) Close() {
|
||||
h.Lock()
|
||||
if !h.isClosed {
|
||||
h.isClosed = true
|
||||
close(h.msgQueue)
|
||||
}
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
func (h *tcpReader) run(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
b := bufio.NewReader(h)
|
||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.emitter)
|
||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
|
||||
if err != nil {
|
||||
io.Copy(ioutil.Discard, b)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
/* It's a connection (bidirectional)
|
||||
@@ -16,16 +17,19 @@ import (
|
||||
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
|
||||
*/
|
||||
type tcpStream struct {
|
||||
tcpstate *reassembly.TCPSimpleFSM
|
||||
fsmerr bool
|
||||
optchecker reassembly.TCPOptionCheck
|
||||
net, transport gopacket.Flow
|
||||
isDNS bool
|
||||
isTapTarget bool
|
||||
clients []tcpReader
|
||||
servers []tcpReader
|
||||
urls []string
|
||||
ident string
|
||||
id int64
|
||||
isClosed bool
|
||||
superIdentifier *api.SuperIdentifier
|
||||
tcpstate *reassembly.TCPSimpleFSM
|
||||
fsmerr bool
|
||||
optchecker reassembly.TCPOptionCheck
|
||||
net, transport gopacket.Flow
|
||||
isDNS bool
|
||||
isTapTarget bool
|
||||
clients []tcpReader
|
||||
servers []tcpReader
|
||||
urls []string
|
||||
ident string
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
@@ -146,12 +150,22 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
statsTracker.incReassembledTcpPayloadsCount()
|
||||
timestamp := ac.GetCaptureInfo().Timestamp
|
||||
if dir == reassembly.TCPDirClientToServer {
|
||||
for _, reader := range t.clients {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
|
||||
for i := range t.clients {
|
||||
reader := &t.clients[i]
|
||||
reader.Lock()
|
||||
if !reader.isClosed {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
|
||||
}
|
||||
reader.Unlock()
|
||||
}
|
||||
} else {
|
||||
for _, reader := range t.servers {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
|
||||
for i := range t.servers {
|
||||
reader := &t.servers[i]
|
||||
reader.Lock()
|
||||
if !reader.isClosed {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
|
||||
}
|
||||
reader.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,14 +174,33 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
|
||||
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
|
||||
Trace("%s: Connection closed", t.ident)
|
||||
if t.isTapTarget {
|
||||
for _, reader := range t.clients {
|
||||
close(reader.msgQueue)
|
||||
}
|
||||
for _, reader := range t.servers {
|
||||
close(reader.msgQueue)
|
||||
}
|
||||
if t.isTapTarget && !t.isClosed {
|
||||
t.Close()
|
||||
}
|
||||
// do not remove the connection to allow last ACK
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *tcpStream) Close() {
|
||||
shouldReturn := false
|
||||
t.Lock()
|
||||
if t.isClosed {
|
||||
shouldReturn = true
|
||||
} else {
|
||||
t.isClosed = true
|
||||
}
|
||||
t.Unlock()
|
||||
if shouldReturn {
|
||||
return
|
||||
}
|
||||
streams.Delete(t.id)
|
||||
|
||||
for i := range t.clients {
|
||||
reader := &t.clients[i]
|
||||
reader.Close()
|
||||
}
|
||||
for i := range t.servers {
|
||||
reader := &t.servers[i]
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@ package tap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
@@ -23,6 +25,16 @@ type tcpStreamFactory struct {
|
||||
Emitter api.Emitter
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
stream *tcpStream
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
var streams *sync.Map = &sync.Map{} // global
|
||||
var streamId int64 = 0
|
||||
|
||||
var maxNumberOfGoroutines int
|
||||
|
||||
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
||||
rlog.Debugf("* NEW: %s %s", net, transport)
|
||||
fsmOptions := reassembly.TCPSimpleFSMOptions{
|
||||
@@ -39,15 +51,23 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
|
||||
isTapTarget := props.isTapTarget
|
||||
stream := &tcpStream{
|
||||
net: net,
|
||||
transport: transport,
|
||||
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
|
||||
isTapTarget: isTapTarget,
|
||||
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
|
||||
ident: fmt.Sprintf("%s:%s", net, transport),
|
||||
optchecker: reassembly.NewTCPOptionCheck(),
|
||||
net: net,
|
||||
transport: transport,
|
||||
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
|
||||
isTapTarget: isTapTarget,
|
||||
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
|
||||
ident: fmt.Sprintf("%s:%s", net, transport),
|
||||
optchecker: reassembly.NewTCPOptionCheck(),
|
||||
superIdentifier: &api.SuperIdentifier{},
|
||||
}
|
||||
if stream.isTapTarget {
|
||||
if runtime.NumGoroutine() > maxNumberOfGoroutines {
|
||||
statsTracker.incDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
|
||||
return stream
|
||||
}
|
||||
streamId++
|
||||
stream.id = streamId
|
||||
for i, extension := range extensions {
|
||||
counterPair := &api.CounterPair{
|
||||
Request: 0,
|
||||
@@ -89,6 +109,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
})
|
||||
|
||||
streams.Store(stream.id, &tcpStreamWrapper{
|
||||
stream: stream,
|
||||
createdAt: time.Now(),
|
||||
})
|
||||
|
||||
factory.wg.Add(2)
|
||||
// Start reading from channel stream.reader.bytes
|
||||
go stream.clients[i].run(&factory.wg)
|
||||
@@ -119,7 +145,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
|
||||
}
|
||||
return &streamProps{isTapTarget: false, isOutgoing: false}
|
||||
} else {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
|
||||
return &streamProps{isTapTarget: true}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user