diff --git a/agent/main.go b/agent/main.go index 3be77822c..f31706288 100644 --- a/agent/main.go +++ b/agent/main.go @@ -152,7 +152,9 @@ func runInTapperMode() { } hostMode := os.Getenv(shared.HostModeEnvVar) == "1" - tapOpts := &tap.TapOpts{HostMode: hostMode} + tapOpts := &tap.TapOpts{ + HostMode: hostMode, + } filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) diff --git a/tap/api/stats_tracker.go b/tap/api/stats_tracker.go index e8905026f..d80f745f8 100644 --- a/tap/api/stats_tracker.go +++ b/tap/api/stats_tracker.go @@ -10,6 +10,7 @@ type AppStats struct { ProcessedBytes uint64 `json:"processedBytes"` PacketsCount uint64 `json:"packetsCount"` TcpPacketsCount uint64 `json:"tcpPacketsCount"` + IgnoredPacketsCount uint64 `json:"ignoredPacketsCount"` ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"` TlsConnectionsCount uint64 `json:"tlsConnectionsCount"` MatchedPairs uint64 `json:"matchedPairs"` @@ -33,6 +34,10 @@ func (as *AppStats) IncTcpPacketsCount() { atomic.AddUint64(&as.TcpPacketsCount, 1) } +func (as *AppStats) IncIgnoredPacketsCount() { + atomic.AddUint64(&as.IgnoredPacketsCount, 1) +} + func (as *AppStats) IncReassembledTcpPayloadsCount() { atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1) } @@ -55,6 +60,7 @@ func (as *AppStats) DumpStats() *AppStats { currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes) currentAppStats.PacketsCount = resetUint64(&as.PacketsCount) currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount) + currentAppStats.IgnoredPacketsCount = resetUint64(&as.IgnoredPacketsCount) currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount) currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount) currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs) diff --git a/tap/go.mod b/tap/go.mod index 86bfa6edc..cad3ace38 100644 --- a/tap/go.mod +++ b/tap/go.mod @@ -6,6 +6,7 @@ require ( github.com/cilium/ebpf v0.8.0 github.com/go-errors/errors v1.4.2 github.com/google/gopacket v1.1.19 + github.com/hashicorp/golang-lru v0.5.4 github.com/up9inc/mizu/logger v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0 github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 @@ -18,7 +19,6 @@ require ( github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/martian v2.1.0+incompatible // indirect - github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -33,7 +33,6 @@ require ( k8s.io/utils v0.0.0-20220127004650-9b3446523e65 // indirect sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect ) replace github.com/up9inc/mizu/logger v0.0.0 => ../logger diff --git a/tap/go.sum b/tap/go.sum index 53b77414e..684d4f546 100644 --- a/tap/go.sum +++ b/tap/go.sum @@ -276,6 +276,5 @@ sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y= sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= +sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= -sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= -sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 43acb3b84..d049622b0 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -16,6 +16,7 @@ import ( "runtime" "strings" "time" + "strconv" "github.com/up9inc/mizu/logger" "github.com/up9inc/mizu/tap/api" @@ -41,6 +42,7 @@ var debug = flag.Bool("debug", false, "Display debug information") var quiet = flag.Bool("quiet", false, "Be quiet regarding errors") var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex") var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container") +var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore") // capture var iface = flag.String("i", "en0", "Interface to read packets from") @@ -55,7 +57,8 @@ var tls = flag.Bool("tls", false, "Enable TLS tapper") var memprofile = flag.String("memprofile", "", "Write memory profile") type TapOpts struct { - HostMode bool + HostMode bool + IgnoredPorts []uint16 } var extensions []*api.Extension // global @@ -193,6 +196,8 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI logger.Log.Fatal(err) } + opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...) + assembler := NewTcpAssembler(outputItems, streamsMap, opts) return assembler @@ -267,3 +272,19 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne return &tls } + +func buildIgnoredPortsList(ignoredPorts string) []uint16 { + tmp := strings.Split(ignoredPorts, ",") + result := make([]uint16, len(tmp)) + + for i, raw := range tmp { + v, err := strconv.Atoi(raw) + if err != nil { + continue + } + + result[i] = uint16(v) + } + + return result +} diff --git a/tap/tcp_assembler.go b/tap/tcp_assembler.go index eee1dbbe0..f8df01458 100644 --- a/tap/tcp_assembler.go +++ b/tap/tcp_assembler.go @@ -23,6 +23,7 @@ type tcpAssembler struct { streamPool *reassembly.StreamPool streamFactory *tcpStreamFactory assemblerMutex sync.Mutex + ignoredPorts []uint16 } // Context @@ -48,8 +49,8 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection() maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal() - logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", - maxBufferedPagesTotal, maxBufferedPagesPerConnection) + logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%v", + maxBufferedPagesTotal, maxBufferedPagesPerConnection, opts) assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection @@ -57,6 +58,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp Assembler: assembler, streamPool: streamPool, streamFactory: streamFactory, + ignoredPorts: opts.IgnoredPorts, } } @@ -83,14 +85,18 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp diagnose.AppStats.IncTcpPacketsCount() tcp := tcp.(*layers.TCP) - c := context{ - CaptureInfo: packet.Metadata().CaptureInfo, - Origin: packetInfo.Source.Origin, + if a.shouldIgnorePort(uint16(tcp.DstPort)) { + diagnose.AppStats.IncIgnoredPacketsCount() + } else { + c := context{ + CaptureInfo: packet.Metadata().CaptureInfo, + Origin: packetInfo.Source.Origin, + } + diagnose.InternalStats.Totalsz += len(tcp.Payload) + a.assemblerMutex.Lock() + a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) + a.assemblerMutex.Unlock() } - diagnose.InternalStats.Totalsz += len(tcp.Payload) - a.assemblerMutex.Lock() - a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) - a.assemblerMutex.Unlock() } done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount @@ -132,3 +138,13 @@ func (a *tcpAssembler) waitAndDump() { logger.Log.Debugf("%s", a.Dump()) a.assemblerMutex.Unlock() } + +func (a *tcpAssembler) shouldIgnorePort(port uint16) bool { + for _, p := range a.ignoredPorts { + if port == p { + return true + } + } + + return false +}