mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-23 14:58:44 +00:00
stop tapping self tapper traffic
This commit is contained in:
parent
97691d6279
commit
092bd5d22d
@ -700,6 +700,7 @@ github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ
|
||||
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220509204026-c37adfc587f4 h1:nNOrU1HVH0fnaG7GNhxCc8kNPVL035Iix7ihUF6lZT8=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220509204026-c37adfc587f4/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/mizu/shared v0.0.0-20220515064232-5fc3e38c1a85/go.mod h1:k3xLxQVWK5oj5q0fAqPHHLBzkimPXZTM2uW3Kqtv5cM=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
github.com/wI2L/jsondiff v0.1.1 h1:r2TkoEet7E4JMO5+s1RCY2R0LrNPNHY6hbDeow2hRHw=
|
||||
|
@ -162,7 +162,10 @@ func runInTapperMode() {
|
||||
}
|
||||
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tapOpts := &tap.TapOpts{
|
||||
HostMode: hostMode,
|
||||
ApiServerAddress: *apiServerAddress,
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
|
@ -10,6 +10,7 @@ type AppStats struct {
|
||||
ProcessedBytes uint64 `json:"processedBytes"`
|
||||
PacketsCount uint64 `json:"packetsCount"`
|
||||
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
|
||||
TapperPacketsCount uint64 `json:"tapperPacketsCount"`
|
||||
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
|
||||
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
|
||||
MatchedPairs uint64 `json:"matchedPairs"`
|
||||
@ -33,6 +34,10 @@ func (as *AppStats) IncTcpPacketsCount() {
|
||||
atomic.AddUint64(&as.TcpPacketsCount, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) IncTapperPacketsCount() {
|
||||
atomic.AddUint64(&as.TapperPacketsCount, 1)
|
||||
}
|
||||
|
||||
func (as *AppStats) IncReassembledTcpPayloadsCount() {
|
||||
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
|
||||
}
|
||||
@ -55,6 +60,7 @@ func (as *AppStats) DumpStats() *AppStats {
|
||||
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
|
||||
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
|
||||
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
|
||||
currentAppStats.TapperPacketsCount = resetUint64(&as.TapperPacketsCount)
|
||||
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
|
||||
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
|
||||
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
|
||||
|
@ -7,8 +7,11 @@ require (
|
||||
github.com/go-errors/errors v1.4.2
|
||||
github.com/google/gopacket v1.1.19
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/pkg/profile v1.6.0
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible
|
||||
github.com/struCoder/pidusage v0.2.1
|
||||
github.com/up9inc/mizu/logger v0.0.0
|
||||
github.com/up9inc/mizu/shared v0.0.0-20220515064232-5fc3e38c1a85
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
|
||||
k8s.io/api v0.23.3
|
||||
@ -18,6 +21,7 @@ require (
|
||||
github.com/go-logr/logr v1.2.2 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
|
||||
github.com/google/go-cmp v0.5.7 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
@ -25,8 +29,6 @@ require (
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
|
||||
github.com/pkg/profile v1.6.0 // indirect
|
||||
github.com/struCoder/pidusage v0.2.1 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.10 // indirect
|
||||
github.com/tklauser/numcpus v0.4.0 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
@ -35,12 +37,12 @@ require (
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
k8s.io/apimachinery v0.23.3 // indirect
|
||||
k8s.io/klog/v2 v2.40.1 // indirect
|
||||
k8s.io/utils v0.0.0-20220127004650-9b3446523e65 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
|
||||
|
846
tap/go.sum
846
tap/go.sum
File diff suppressed because it is too large
Load Diff
@ -59,7 +59,8 @@ var cpuprofile = flag.String("tap.cpuprofile", "", "Write cpu profile") // cpupr
|
||||
var memprofile = flag.String("tap.memprofile", "", "Write memory profile")
|
||||
|
||||
type TapOpts struct {
|
||||
HostMode bool
|
||||
HostMode bool
|
||||
ApiServerAddress string
|
||||
}
|
||||
|
||||
var extensions []*api.Extension // global
|
||||
|
@ -2,8 +2,10 @@ package tap
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -11,6 +13,7 @@ import (
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
"github.com/up9inc/mizu/tap/source"
|
||||
@ -23,6 +26,7 @@ type tcpAssembler struct {
|
||||
streamPool *reassembly.StreamPool
|
||||
streamFactory *tcpStreamFactory
|
||||
assemblerMutex sync.Mutex
|
||||
apiServerPort uint16
|
||||
}
|
||||
|
||||
// Context
|
||||
@ -57,6 +61,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
|
||||
Assembler: assembler,
|
||||
streamPool: streamPool,
|
||||
streamFactory: streamFactory,
|
||||
apiServerPort: extractApiServerPort(opts.ApiServerAddress),
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,16 +88,20 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
|
||||
diagnose.AppStats.IncTcpPacketsCount()
|
||||
tcp := tcp.(*layers.TCP)
|
||||
|
||||
c := context{
|
||||
CaptureInfo: packet.Metadata().CaptureInfo,
|
||||
Origin: packetInfo.Source.Origin,
|
||||
if uint16(tcp.DstPort) == a.apiServerPort {
|
||||
diagnose.AppStats.IncTapperPacketsCount()
|
||||
} else {
|
||||
c := context{
|
||||
CaptureInfo: packet.Metadata().CaptureInfo,
|
||||
Origin: packetInfo.Source.Origin,
|
||||
}
|
||||
diagnose.InternalStats.Totalsz += len(tcp.Payload)
|
||||
a.assemblerMutex.Lock()
|
||||
if os.Getenv("MIZU_TAPPER_NO_ASSEMBLER") != "true" {
|
||||
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
||||
}
|
||||
a.assemblerMutex.Unlock()
|
||||
}
|
||||
diagnose.InternalStats.Totalsz += len(tcp.Payload)
|
||||
a.assemblerMutex.Lock()
|
||||
if os.Getenv("MIZU_TAPPER_NO_ASSEMBLER") != "true" {
|
||||
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
||||
}
|
||||
a.assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
|
||||
@ -134,3 +143,26 @@ func (a *tcpAssembler) waitAndDump() {
|
||||
logger.Log.Debugf("%s", a.Dump())
|
||||
a.assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
func extractApiServerPort(apiServerAddress string) uint16 {
|
||||
url, err := url.Parse(apiServerAddress)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Failed to parse api server url %t", err)
|
||||
return shared.DefaultApiServerPort
|
||||
} else {
|
||||
portStr := url.Port()
|
||||
|
||||
if portStr == "" {
|
||||
return shared.DefaultApiServerPort
|
||||
} else {
|
||||
apiServerPort, err := strconv.ParseInt(portStr, 10, 16)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Failed to convert api server port to number %t", err)
|
||||
}
|
||||
|
||||
return uint16(apiServerPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user