stop tapping self tapper traffic (#1083)

* stop tapping self tapper traffic

* run go mod tidy

* allow to explicitly ignore ports

* remove unused code

* remove shared from tap + go mod tidy

* move ignroe ports to tapper

* rename TapperPacketsCount to IgnoredPacketsCount

* don't check null - go is smart

* remove nil check
This commit is contained in:
David Levanon 2022-05-18 15:13:10 +03:00 committed by GitHub
parent 948af518b5
commit a9de4f0bba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 15 deletions

View File

@ -152,7 +152,9 @@ func runInTapperMode() {
}
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapOpts := &tap.TapOpts{
HostMode: hostMode,
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)

View File

@ -10,6 +10,7 @@ type AppStats struct {
ProcessedBytes uint64 `json:"processedBytes"`
PacketsCount uint64 `json:"packetsCount"`
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
IgnoredPacketsCount uint64 `json:"ignoredPacketsCount"`
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
MatchedPairs uint64 `json:"matchedPairs"`
@ -33,6 +34,10 @@ func (as *AppStats) IncTcpPacketsCount() {
atomic.AddUint64(&as.TcpPacketsCount, 1)
}
func (as *AppStats) IncIgnoredPacketsCount() {
atomic.AddUint64(&as.IgnoredPacketsCount, 1)
}
func (as *AppStats) IncReassembledTcpPayloadsCount() {
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
}
@ -55,6 +60,7 @@ func (as *AppStats) DumpStats() *AppStats {
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
currentAppStats.IgnoredPacketsCount = resetUint64(&as.IgnoredPacketsCount)
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)

View File

@ -6,6 +6,7 @@ require (
github.com/cilium/ebpf v0.8.0
github.com/go-errors/errors v1.4.2
github.com/google/gopacket v1.1.19
github.com/hashicorp/golang-lru v0.5.4
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
@ -18,7 +19,6 @@ require (
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@ -33,7 +33,6 @@ require (
k8s.io/utils v0.0.0-20220127004650-9b3446523e65 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger

View File

@ -276,6 +276,5 @@ sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

View File

@ -16,6 +16,7 @@ import (
"runtime"
"strings"
"time"
"strconv"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
@ -41,6 +42,7 @@ var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore")
// capture
var iface = flag.String("i", "en0", "Interface to read packets from")
@ -55,7 +57,8 @@ var tls = flag.Bool("tls", false, "Enable TLS tapper")
var memprofile = flag.String("memprofile", "", "Write memory profile")
type TapOpts struct {
HostMode bool
HostMode bool
IgnoredPorts []uint16
}
var extensions []*api.Extension // global
@ -193,6 +196,8 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
logger.Log.Fatal(err)
}
opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...)
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
return assembler
@ -267,3 +272,19 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
return &tls
}
func buildIgnoredPortsList(ignoredPorts string) []uint16 {
tmp := strings.Split(ignoredPorts, ",")
result := make([]uint16, len(tmp))
for i, raw := range tmp {
v, err := strconv.Atoi(raw)
if err != nil {
continue
}
result[i] = uint16(v)
}
return result
}

View File

@ -23,6 +23,7 @@ type tcpAssembler struct {
streamPool *reassembly.StreamPool
streamFactory *tcpStreamFactory
assemblerMutex sync.Mutex
ignoredPorts []uint16
}
// Context
@ -48,8 +49,8 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%v",
maxBufferedPagesTotal, maxBufferedPagesPerConnection, opts)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
@ -57,6 +58,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
Assembler: assembler,
streamPool: streamPool,
streamFactory: streamFactory,
ignoredPorts: opts.IgnoredPorts,
}
}
@ -83,14 +85,18 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
diagnose.AppStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
if a.shouldIgnorePort(uint16(tcp.DstPort)) {
diagnose.AppStats.IncIgnoredPacketsCount()
} else {
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
@ -132,3 +138,13 @@ func (a *tcpAssembler) waitAndDump() {
logger.Log.Debugf("%s", a.Dump())
a.assemblerMutex.Unlock()
}
func (a *tcpAssembler) shouldIgnorePort(port uint16) bool {
for _, p := range a.ignoredPorts {
if port == p {
return true
}
}
return false
}