mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-04-29 04:16:29 +00:00
336 lines
10 KiB
Go
336 lines
10 KiB
Go
// Copyright 2012 Google, Inc. All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license
|
|
// that can be found in the LICENSE file in the root of the source
|
|
// tree.
|
|
|
|
// The pcapdump binary implements a tcpdump-like command line tool with gopacket
|
|
// using pcap as a backend data collection mechanism.
|
|
package tap
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/shirou/gopsutil/cpu"
|
|
"github.com/struCoder/pidusage"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
"github.com/up9inc/mizu/tap/diagnose"
|
|
"github.com/up9inc/mizu/tap/source"
|
|
"github.com/up9inc/mizu/tap/tlstapper"
|
|
v1 "k8s.io/api/core/v1"
|
|
)
|
|
|
|
const cleanPeriod = time.Second * 10
|
|
|
|
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
|
|
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
|
|
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
|
|
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
|
|
var nodefrag = flag.Bool("nodefrag", false, "If true, do not do IPv4 defrag")
|
|
var checksum = flag.Bool("checksum", false, "Check TCP checksum") // global
|
|
var nooptcheck = flag.Bool("nooptcheck", true, "Do not check TCP options (useful to ignore MSS on captures with TSO)") // global
|
|
var ignorefsmerr = flag.Bool("ignorefsmerr", true, "Ignore TCP FSM errors") // global
|
|
var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams without SYN/SYN+ACK/ACK sequence") // global
|
|
var verbose = flag.Bool("verbose", false, "Be verbose")
|
|
var debug = flag.Bool("debug", false, "Display debug information")
|
|
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
|
|
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
|
|
var procfs = flag.String("procfs", "/proc", "The procfs directory, used when mapping host volumes into a container")
|
|
var ignoredPorts = flag.String("ignore-ports", "", "A comma separated list of ports to ignore")
|
|
var maxLiveStreams = flag.Int("max-live-streams", 500, "Maximum live streams to handle concurrently")
|
|
|
|
// capture
|
|
var iface = flag.String("i", "en0", "Interface to read packets from")
|
|
var fname = flag.String("r", "", "Filename to read from, overrides -i")
|
|
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
|
|
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
|
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
|
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
|
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
|
var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
|
|
|
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
|
|
|
type TapOpts struct {
|
|
HostMode bool
|
|
IgnoredPorts []uint16
|
|
maxLiveStreams int
|
|
staleConnectionTimeout time.Duration
|
|
}
|
|
|
|
var extensions []*api.Extension // global
|
|
var filteringOptions *api.TrafficFilteringOptions // global
|
|
var tapTargets []v1.Pod // global
|
|
var packetSourceManager *source.PacketSourceManager // global
|
|
var mainPacketInputChan chan source.TcpPacketInfo // global
|
|
var tlsTapperInstance *tlstapper.TlsTapper // global
|
|
|
|
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
|
extensions = extensionsRef
|
|
filteringOptions = options
|
|
|
|
streamsMap := NewTcpStreamMap()
|
|
|
|
if *tls {
|
|
for _, e := range extensions {
|
|
if e.Protocol.Name == "http" {
|
|
tlsTapperInstance = startTlsTapper(e, outputItems, options, streamsMap)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if GetMemoryProfilingEnabled() {
|
|
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
|
}
|
|
|
|
assembler, err := initializePassiveTapper(opts, outputItems, streamsMap)
|
|
|
|
if err != nil {
|
|
logger.Log.Errorf("Error initializing tapper %w", err)
|
|
return
|
|
}
|
|
|
|
go startPassiveTapper(streamsMap, assembler)
|
|
}
|
|
|
|
func UpdateTapTargets(newTapTargets []v1.Pod) {
|
|
success := true
|
|
|
|
tapTargets = newTapTargets
|
|
|
|
packetSourceManager.UpdatePods(tapTargets, !*nodefrag, mainPacketInputChan)
|
|
|
|
if tlsTapperInstance != nil && os.Getenv("MIZU_GLOBAL_GOLANG_PID") == "" {
|
|
if err := tlstapper.UpdateTapTargets(tlsTapperInstance, &tapTargets, *procfs); err != nil {
|
|
tlstapper.LogError(err)
|
|
success = false
|
|
}
|
|
}
|
|
|
|
printNewTapTargets(success)
|
|
}
|
|
|
|
func printNewTapTargets(success bool) {
|
|
printStr := ""
|
|
for _, tapTarget := range tapTargets {
|
|
printStr += fmt.Sprintf("%s (%s), ", tapTarget.Status.PodIP, tapTarget.Name)
|
|
}
|
|
printStr = strings.TrimRight(printStr, ", ")
|
|
|
|
if success {
|
|
logger.Log.Infof("Now tapping: %s", printStr)
|
|
} else {
|
|
logger.Log.Errorf("Failed to start tapping: %s", printStr)
|
|
}
|
|
}
|
|
|
|
func printPeriodicStats(cleaner *Cleaner, assembler *tcpAssembler) {
|
|
statsPeriod := time.Second * time.Duration(*statsevery)
|
|
ticker := time.NewTicker(statsPeriod)
|
|
|
|
logicalCoreCount, err := cpu.Counts(true)
|
|
if err != nil {
|
|
logicalCoreCount = -1
|
|
}
|
|
|
|
physicalCoreCount, err := cpu.Counts(false)
|
|
if err != nil {
|
|
physicalCoreCount = -1
|
|
}
|
|
|
|
for {
|
|
<-ticker.C
|
|
|
|
// Since the start
|
|
errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary()
|
|
|
|
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
|
time.Since(diagnose.AppStats.StartTime),
|
|
diagnose.TapErrors.ErrorsCount,
|
|
errorMapLen,
|
|
errorsSummery,
|
|
)
|
|
|
|
// At this moment
|
|
memStats := runtime.MemStats{}
|
|
runtime.ReadMemStats(&memStats)
|
|
sysInfo, err := pidusage.GetStat(os.Getpid())
|
|
if err != nil {
|
|
sysInfo = &pidusage.SysInfo{
|
|
CPU: -1,
|
|
Memory: -1,
|
|
}
|
|
}
|
|
logger.Log.Infof(
|
|
"heap-alloc: %d, heap-idle: %d, heap-objects: %d, goroutines: %d, cpu: %f, cores: %d/%d, rss: %f",
|
|
memStats.HeapAlloc,
|
|
memStats.HeapIdle,
|
|
memStats.HeapObjects,
|
|
runtime.NumGoroutine(),
|
|
sysInfo.CPU,
|
|
logicalCoreCount,
|
|
physicalCoreCount,
|
|
sysInfo.Memory)
|
|
|
|
// Since the last print
|
|
cleanStats := cleaner.dumpStats()
|
|
assemblerStats := assembler.DumpStats()
|
|
logger.Log.Infof(
|
|
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
|
|
assemblerStats.flushedConnections,
|
|
assemblerStats.closedConnections,
|
|
cleanStats.deleted,
|
|
)
|
|
currentAppStats := diagnose.AppStats.DumpStats()
|
|
appStatsJSON, _ := json.Marshal(currentAppStats)
|
|
logger.Log.Infof("app stats - %v", string(appStatsJSON))
|
|
|
|
// At the moment
|
|
logger.Log.Infof("assembler-stats: %s, packet-source-stats: %s", assembler.Dump(), packetSourceManager.Stats())
|
|
}
|
|
}
|
|
|
|
func initializePacketSources() error {
|
|
if packetSourceManager != nil {
|
|
packetSourceManager.Close()
|
|
}
|
|
|
|
var bpffilter string
|
|
if len(flag.Args()) > 0 {
|
|
bpffilter = strings.Join(flag.Args(), " ")
|
|
}
|
|
|
|
behaviour := source.TcpPacketSourceBehaviour{
|
|
SnapLength: *snaplen,
|
|
Promisc: *promisc,
|
|
Tstype: *tstype,
|
|
DecoderName: *decoder,
|
|
Lazy: *lazy,
|
|
BpfFilter: bpffilter,
|
|
}
|
|
|
|
var err error
|
|
packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, mainPacketInputChan)
|
|
return err
|
|
}
|
|
|
|
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, streamsMap api.TcpStreamMap) (*tcpAssembler, error) {
|
|
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
|
diagnose.InitializeTapperInternalStats()
|
|
|
|
mainPacketInputChan = make(chan source.TcpPacketInfo)
|
|
|
|
if err := initializePacketSources(); err != nil {
|
|
logger.Log.Fatal(err)
|
|
}
|
|
|
|
opts.IgnoredPorts = append(opts.IgnoredPorts, buildIgnoredPortsList(*ignoredPorts)...)
|
|
opts.maxLiveStreams = *maxLiveStreams
|
|
opts.staleConnectionTimeout = time.Duration(*staleTimeoutSeconds) * time.Second
|
|
|
|
return NewTcpAssembler(outputItems, streamsMap, opts)
|
|
}
|
|
|
|
func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
|
|
go streamsMap.CloseTimedoutTcpStreamChannels()
|
|
|
|
diagnose.AppStats.SetStartTime(time.Now())
|
|
|
|
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
|
|
cleaner := Cleaner{
|
|
assembler: assembler.Assembler,
|
|
cleanPeriod: cleanPeriod,
|
|
connectionTimeout: staleConnectionTimeout,
|
|
streamsMap: streamsMap,
|
|
}
|
|
cleaner.start()
|
|
|
|
go printPeriodicStats(&cleaner, assembler)
|
|
|
|
assembler.processPackets(*hexdumppkt, mainPacketInputChan)
|
|
|
|
if diagnose.TapErrors.OutputLevel >= 2 {
|
|
assembler.dumpStreamPool()
|
|
}
|
|
|
|
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
|
|
logger.Log.Errorf("Error dumping memory profile %v", err)
|
|
}
|
|
|
|
assembler.waitAndDump()
|
|
|
|
diagnose.InternalStats.PrintStatsSummary()
|
|
diagnose.TapErrors.PrintSummary()
|
|
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
|
|
}
|
|
|
|
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem,
|
|
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) *tlstapper.TlsTapper {
|
|
tls := tlstapper.TlsTapper{}
|
|
chunksBufferSize := os.Getpagesize() * 100
|
|
logBufferSize := os.Getpagesize()
|
|
|
|
if err := tls.Init(chunksBufferSize, logBufferSize, *procfs, extension); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
|
|
if err := tlstapper.UpdateTapTargets(&tls, &tapTargets, *procfs); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
|
|
// A quick way to instrument libssl.so without PID filtering - used for debuging and troubleshooting
|
|
//
|
|
if os.Getenv("MIZU_GLOBAL_SSL_LIBRARY") != "" {
|
|
if err := tls.GlobalSSLLibTap(os.Getenv("MIZU_GLOBAL_SSL_LIBRARY")); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// A quick way to instrument Go `crypto/tls` without PID filtering - used for debuging and troubleshooting
|
|
//
|
|
if os.Getenv("MIZU_GLOBAL_GOLANG_PID") != "" {
|
|
if err := tls.GlobalGoTap(*procfs, os.Getenv("MIZU_GLOBAL_GOLANG_PID")); err != nil {
|
|
tlstapper.LogError(err)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
var emitter api.Emitter = &api.Emitting{
|
|
AppStats: &diagnose.AppStats,
|
|
OutputChannel: outputItems,
|
|
}
|
|
|
|
go tls.PollForLogging()
|
|
go tls.Poll(emitter, options, streamsMap)
|
|
|
|
return &tls
|
|
}
|
|
|
|
func buildIgnoredPortsList(ignoredPorts string) []uint16 {
|
|
tmp := strings.Split(ignoredPorts, ",")
|
|
result := make([]uint16, len(tmp))
|
|
|
|
for i, raw := range tmp {
|
|
v, err := strconv.Atoi(raw)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
result[i] = uint16(v)
|
|
}
|
|
|
|
return result
|
|
}
|