mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-27 08:39:49 +00:00
Add AF_PACKET
support (#1052)
* Add `AF_PACKET` support * Update `.gitignore` * Support both `libpcap` and `AF_PACKET` at the same time * Fix linter errors * Fix a bug that introduced while fixing a linter error * Revert the changes related to `MaxBufferedPages` prefixed consts * #run_acceptance_tests * #run_acceptance_tests * Revert channel buffer size #run_acceptance_tests * Revert "Revert channel buffer size #run_acceptance_tests" This reverts commite62c3844cd
. * Increase `cy.wait` from `500` to `1000` #run_acceptance_tests * Fix the `pcapHandle` handle * Revert "Increase `cy.wait` from `500` to `1000` #run_acceptance_tests" This reverts commit938c550e72
. * #run_acceptance_tests * Handle the merge conflicts * Add `AF_XDP` support * Implement `Close()` of `AF_XDP` and fix linter errors * Fix `NewIPProtoProgram` function and internet protocol number * Pipe the packet stream from every network interface using `*pcapgo.NgReader` and `*pcapgo.NgWriter` Implement `SetDecoder` and `SetBPF` methods. * Fix `NewNgReader` call * Implement `Stats` method * Rebroadcast to the XDP socket * Add `-packet-capture` flag and make `AF_PACKET`, `AF_XDP` optional * #run_acceptance_tests * Fix `newAfXdpHandle` method * #run_acceptance_tests * Update tap/xdp/ipproto.c Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> * Update tap/xdp/ipproto.c Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> * Update tap/xdp/ipproto.c Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> * Fix several issues * Update tap/xdp/ipproto.c Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> * Fix `ipproto.c` * Remove `AF_XDP` * Comment on frameSize Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
This commit is contained in:
parent
378270ee3d
commit
e52ba1f05d
1
.gitignore
vendored
1
.gitignore
vendored
@ -53,6 +53,7 @@ tap/extensions/*/expect
|
|||||||
**/node_modules/**
|
**/node_modules/**
|
||||||
**/dist/**
|
**/dist/**
|
||||||
*.editorconfig
|
*.editorconfig
|
||||||
|
ui/up9-mizu-common-0.0.0.tgz
|
||||||
|
|
||||||
# Ignore *.log files
|
# Ignore *.log files
|
||||||
*.log
|
*.log
|
||||||
|
@ -104,8 +104,7 @@ ARG BUILD_TIMESTAMP
|
|||||||
ARG VER=0.0
|
ARG VER=0.0
|
||||||
|
|
||||||
WORKDIR /app/tap/tlstapper
|
WORKDIR /app/tap/tlstapper
|
||||||
|
RUN rm *_bpfel_*
|
||||||
RUN rm tlstapper_bpf*
|
|
||||||
RUN GOARCH=${BUILDARCH} go generate tls_tapper.go
|
RUN GOARCH=${BUILDARCH} go generate tls_tapper.go
|
||||||
|
|
||||||
WORKDIR /app/agent-build
|
WORKDIR /app/agent-build
|
||||||
|
@ -51,6 +51,7 @@ type TapConfig struct {
|
|||||||
TapperResources shared.Resources `yaml:"tapper-resources"`
|
TapperResources shared.Resources `yaml:"tapper-resources"`
|
||||||
ServiceMesh bool `yaml:"service-mesh" default:"false"`
|
ServiceMesh bool `yaml:"service-mesh" default:"false"`
|
||||||
Tls bool `yaml:"tls" default:"false"`
|
Tls bool `yaml:"tls" default:"false"`
|
||||||
|
PacketCapture string `yaml:"packet-capture" default:"libpcap"`
|
||||||
Profiler bool `yaml:"profiler" default:"false"`
|
Profiler bool `yaml:"profiler" default:"false"`
|
||||||
MaxLiveStreams int `yaml:"max-live-streams" default:"500"`
|
MaxLiveStreams int `yaml:"max-live-streams" default:"500"`
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ require (
|
|||||||
github.com/up9inc/mizu/tap/api v0.0.0
|
github.com/up9inc/mizu/tap/api v0.0.0
|
||||||
github.com/up9inc/mizu/tap/dbgctl v0.0.0
|
github.com/up9inc/mizu/tap/dbgctl v0.0.0
|
||||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
|
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
|
||||||
|
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
|
||||||
k8s.io/api v0.23.3
|
k8s.io/api v0.23.3
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -33,7 +34,6 @@ require (
|
|||||||
github.com/tklauser/go-sysconf v0.3.10 // indirect
|
github.com/tklauser/go-sysconf v0.3.10 // indirect
|
||||||
github.com/tklauser/numcpus v0.4.0 // indirect
|
github.com/tklauser/numcpus v0.4.0 // indirect
|
||||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||||
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
|
|
||||||
golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
|
golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
|
||||||
golang.org/x/text v0.3.7 // indirect
|
golang.org/x/text v0.3.7 // indirect
|
||||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||||
|
@ -51,11 +51,13 @@ var maxLiveStreams = flag.Int("max-live-streams", 500, "Maximum live streams to
|
|||||||
var iface = flag.String("i", "en0", "Interface to read packets from")
|
var iface = flag.String("i", "en0", "Interface to read packets from")
|
||||||
var fname = flag.String("r", "", "Filename to read from, overrides -i")
|
var fname = flag.String("r", "", "Filename to read from, overrides -i")
|
||||||
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
|
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
|
||||||
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
var targetSizeMb = flag.Int("target-size-mb", 8, "AF_PACKET target block size (MB)")
|
||||||
|
var tstype = flag.String("timestamp-type", "", "Type of timestamps to use")
|
||||||
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
||||||
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
||||||
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
||||||
var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
||||||
|
var packetCapture = flag.String("packet-capture", "libpcap", "Packet capture backend. Possible values: libpcap, af_packet")
|
||||||
|
|
||||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||||
|
|
||||||
@ -211,6 +213,7 @@ func initializePacketSources() error {
|
|||||||
|
|
||||||
behaviour := source.TcpPacketSourceBehaviour{
|
behaviour := source.TcpPacketSourceBehaviour{
|
||||||
SnapLength: *snaplen,
|
SnapLength: *snaplen,
|
||||||
|
TargetSizeMb: *targetSizeMb,
|
||||||
Promisc: *promisc,
|
Promisc: *promisc,
|
||||||
Tstype: *tstype,
|
Tstype: *tstype,
|
||||||
DecoderName: *decoder,
|
DecoderName: *decoder,
|
||||||
@ -219,7 +222,7 @@ func initializePacketSources() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, mainPacketInputChan)
|
packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, *packetCapture, mainPacketInputChan)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
152
tap/source/handle_af_packet.go
Normal file
152
tap/source/handle_af_packet.go
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
package source
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/gopacket"
|
||||||
|
"github.com/google/gopacket/afpacket"
|
||||||
|
"github.com/google/gopacket/layers"
|
||||||
|
"github.com/google/gopacket/pcap"
|
||||||
|
"golang.org/x/net/bpf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type afPacketHandle struct {
|
||||||
|
source gopacket.ZeroCopyPacketDataSource
|
||||||
|
capture *afpacket.TPacket
|
||||||
|
decoder gopacket.Decoder
|
||||||
|
decodeOptions gopacket.DecodeOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *afPacketHandle) NextPacket() (packet gopacket.Packet, err error) {
|
||||||
|
var data []byte
|
||||||
|
var ci gopacket.CaptureInfo
|
||||||
|
data, ci, err = h.source.ZeroCopyReadPacketData()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
packet = gopacket.NewPacket(data, h.decoder, h.decodeOptions)
|
||||||
|
m := packet.Metadata()
|
||||||
|
m.CaptureInfo = ci
|
||||||
|
m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *afPacketHandle) SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) {
|
||||||
|
h.decoder = decoder
|
||||||
|
h.decodeOptions = gopacket.DecodeOptions{Lazy: lazy, NoCopy: noCopy}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *afPacketHandle) SetBPF(expr string) (err error) {
|
||||||
|
var pcapBPF []pcap.BPFInstruction
|
||||||
|
pcapBPF, err = pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bpfIns := []bpf.RawInstruction{}
|
||||||
|
for _, ins := range pcapBPF {
|
||||||
|
bpfIns2 := bpf.RawInstruction{
|
||||||
|
Op: ins.Code,
|
||||||
|
Jt: ins.Jt,
|
||||||
|
Jf: ins.Jf,
|
||||||
|
K: ins.K,
|
||||||
|
}
|
||||||
|
bpfIns = append(bpfIns, bpfIns2)
|
||||||
|
}
|
||||||
|
err = h.capture.SetBPF(bpfIns)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *afPacketHandle) LinkType() layers.LinkType {
|
||||||
|
return layers.LinkTypeEthernet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *afPacketHandle) Stats() (packetsReceived uint, packetsDropped uint, err error) {
|
||||||
|
var stats afpacket.SocketStatsV3
|
||||||
|
_, stats, err = h.capture.SocketStats()
|
||||||
|
packetsReceived = stats.Packets()
|
||||||
|
packetsDropped = stats.Drops()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *afPacketHandle) Close() (err error) {
|
||||||
|
h.capture.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAfpacketHandle(device string, targetSizeMb int, snaplen int) (handle Handle, err error) {
|
||||||
|
snaplen -= 1
|
||||||
|
if snaplen < 0 {
|
||||||
|
snaplen = 0
|
||||||
|
}
|
||||||
|
szFrame, szBlock, numBlocks, err := afpacketComputeSize(targetSizeMb, snaplen, os.Getpagesize())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var capture *afpacket.TPacket
|
||||||
|
capture, err = newAfpacket(device, szFrame, szBlock, numBlocks, false, pcap.BlockForever)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
handle = &afPacketHandle{
|
||||||
|
capture: capture,
|
||||||
|
source: gopacket.ZeroCopyPacketDataSource(capture),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAfpacket(device string, snaplen int, block_size int, num_blocks int,
|
||||||
|
useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) {
|
||||||
|
|
||||||
|
var h *afpacket.TPacket
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if device == "any" {
|
||||||
|
h, err = afpacket.NewTPacket(
|
||||||
|
afpacket.OptFrameSize(snaplen),
|
||||||
|
afpacket.OptBlockSize(block_size),
|
||||||
|
afpacket.OptNumBlocks(num_blocks),
|
||||||
|
afpacket.OptAddVLANHeader(useVLAN),
|
||||||
|
afpacket.OptPollTimeout(timeout),
|
||||||
|
afpacket.SocketRaw,
|
||||||
|
afpacket.TPacketVersion3)
|
||||||
|
} else {
|
||||||
|
h, err = afpacket.NewTPacket(
|
||||||
|
afpacket.OptInterface(device),
|
||||||
|
afpacket.OptFrameSize(snaplen),
|
||||||
|
afpacket.OptBlockSize(block_size),
|
||||||
|
afpacket.OptNumBlocks(num_blocks),
|
||||||
|
afpacket.OptAddVLANHeader(useVLAN),
|
||||||
|
afpacket.OptPollTimeout(timeout),
|
||||||
|
afpacket.SocketRaw,
|
||||||
|
afpacket.TPacketVersion3)
|
||||||
|
}
|
||||||
|
return h, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// afpacketComputeSize computes the block_size and the num_blocks in such a way that the
|
||||||
|
// allocated mmap buffer is close to but smaller than target_size_mb.
|
||||||
|
// The restriction is that the block_size must be divisible by both the
|
||||||
|
// frame size and page size.
|
||||||
|
func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) (
|
||||||
|
frameSize int, blockSize int, numBlocks int, err error) {
|
||||||
|
|
||||||
|
// frameSize calculation was taken from gopacket's afpacket.go
|
||||||
|
if snaplen < pageSize {
|
||||||
|
frameSize = pageSize / (pageSize / snaplen)
|
||||||
|
} else {
|
||||||
|
frameSize = (snaplen/pageSize + 1) * pageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// 128 is the default from the gopacket library so just use that
|
||||||
|
blockSize = frameSize * 128
|
||||||
|
numBlocks = (targetSizeMb * 1024 * 1024) / blockSize
|
||||||
|
|
||||||
|
if numBlocks == 0 {
|
||||||
|
return 0, 0, 0, fmt.Errorf("Interface buffersize is too small")
|
||||||
|
}
|
||||||
|
|
||||||
|
return frameSize, blockSize, numBlocks, nil
|
||||||
|
}
|
97
tap/source/handle_pcap.go
Normal file
97
tap/source/handle_pcap.go
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
package source
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/gopacket"
|
||||||
|
"github.com/google/gopacket/layers"
|
||||||
|
"github.com/google/gopacket/pcap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type pcapHandle struct {
|
||||||
|
source *gopacket.PacketSource
|
||||||
|
capture *pcap.Handle
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *pcapHandle) NextPacket() (packet gopacket.Packet, err error) {
|
||||||
|
return h.source.NextPacket()
|
||||||
|
}
|
||||||
|
func (h *pcapHandle) SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) {
|
||||||
|
h.source = gopacket.NewPacketSource(h.capture, decoder)
|
||||||
|
h.source.Lazy = lazy
|
||||||
|
h.source.NoCopy = noCopy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *pcapHandle) SetBPF(expr string) (err error) {
|
||||||
|
return h.capture.SetBPFFilter(expr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *pcapHandle) LinkType() layers.LinkType {
|
||||||
|
return h.capture.LinkType()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *pcapHandle) Stats() (packetsReceived uint, packetsDropped uint, err error) {
|
||||||
|
var stats *pcap.Stats
|
||||||
|
stats, err = h.capture.Stats()
|
||||||
|
packetsReceived = uint(stats.PacketsReceived)
|
||||||
|
packetsDropped = uint(stats.PacketsDropped)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *pcapHandle) Close() (err error) {
|
||||||
|
h.capture.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPcapHandle(filename string, device string, snaplen int, promisc bool, tstype string) (handle Handle, err error) {
|
||||||
|
var capture *pcap.Handle
|
||||||
|
|
||||||
|
if filename != "" {
|
||||||
|
if capture, err = pcap.OpenOffline(filename); err != nil {
|
||||||
|
err = fmt.Errorf("PCAP OpenOffline error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// This is a little complicated because we want to allow all possible options
|
||||||
|
// for creating the packet capture handle... instead of all this you can
|
||||||
|
// just call pcap.OpenLive if you want a simple handle.
|
||||||
|
var inactive *pcap.InactiveHandle
|
||||||
|
inactive, err = pcap.NewInactiveHandle(device)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("could not create: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer inactive.CleanUp()
|
||||||
|
if err = inactive.SetSnapLen(snaplen); err != nil {
|
||||||
|
err = fmt.Errorf("could not set snap length: %v", err)
|
||||||
|
return
|
||||||
|
} else if err = inactive.SetPromisc(promisc); err != nil {
|
||||||
|
err = fmt.Errorf("could not set promisc mode: %v", err)
|
||||||
|
return
|
||||||
|
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
||||||
|
err = fmt.Errorf("could not set timeout: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tstype != "" {
|
||||||
|
var t pcap.TimestampSource
|
||||||
|
if t, err = pcap.TimestampSourceFromString(tstype); err != nil {
|
||||||
|
err = fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||||
|
return
|
||||||
|
} else if err = inactive.SetTimestampSource(t); err != nil {
|
||||||
|
err = fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if capture, err = inactive.Activate(); err != nil {
|
||||||
|
err = fmt.Errorf("PCAP Activate error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handle = &pcapHandle{
|
||||||
|
capture: capture,
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
@ -9,7 +9,7 @@ import (
|
|||||||
"github.com/vishvananda/netns"
|
"github.com/vishvananda/netns"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newNetnsPacketSource(procfs string, pid string, interfaceName string,
|
func newNetnsPacketSource(procfs string, pid string, interfaceName string, packetCapture string,
|
||||||
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
||||||
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
|
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
|
||||||
|
|
||||||
@ -18,7 +18,7 @@ func newNetnsPacketSource(procfs string, pid string, interfaceName string,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, behaviour, origin)
|
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, packetCapture, behaviour, origin)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
|
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
|
||||||
@ -28,7 +28,7 @@ func newNetnsPacketSource(procfs string, pid string, interfaceName string,
|
|||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string,
|
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string, packetCapture string,
|
||||||
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
||||||
|
|
||||||
done := make(chan *tcpPacketSource)
|
done := make(chan *tcpPacketSource)
|
||||||
@ -58,7 +58,7 @@ func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceNam
|
|||||||
}
|
}
|
||||||
|
|
||||||
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
|
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
|
||||||
src, err := newTcpPacketSource(name, "", interfaceName, behaviour, origin)
|
src, err := newTcpPacketSource(name, "", interfaceName, packetCapture, behaviour, origin)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
|
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
|
||||||
|
@ -16,6 +16,7 @@ type PacketSourceManagerConfig struct {
|
|||||||
mtls bool
|
mtls bool
|
||||||
procfs string
|
procfs string
|
||||||
interfaceName string
|
interfaceName string
|
||||||
|
packetCapture string
|
||||||
behaviour TcpPacketSourceBehaviour
|
behaviour TcpPacketSourceBehaviour
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,8 +26,9 @@ type PacketSourceManager struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
|
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
|
||||||
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
|
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool,
|
||||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
packetCapture string, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
|
||||||
|
hostSource, err := newHostPacketSource(filename, interfaceName, packetCapture, behaviour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -41,6 +43,7 @@ func NewPacketSourceManager(procfs string, filename string, interfaceName string
|
|||||||
mtls: mtls,
|
mtls: mtls,
|
||||||
procfs: procfs,
|
procfs: procfs,
|
||||||
interfaceName: interfaceName,
|
interfaceName: interfaceName,
|
||||||
|
packetCapture: packetCapture,
|
||||||
behaviour: behaviour,
|
behaviour: behaviour,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,7 +51,7 @@ func NewPacketSourceManager(procfs string, filename string, interfaceName string
|
|||||||
return sourceManager, nil
|
return sourceManager, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHostPacketSource(filename string, interfaceName string,
|
func newHostPacketSource(filename string, interfaceName string, packetCapture string,
|
||||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||||
var name string
|
var name string
|
||||||
if filename == "" {
|
if filename == "" {
|
||||||
@ -57,7 +60,7 @@ func newHostPacketSource(filename string, interfaceName string,
|
|||||||
name = fmt.Sprintf("file-%s", filename)
|
name = fmt.Sprintf("file-%s", filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour, api.Pcap)
|
source, err := newTcpPacketSource(name, filename, interfaceName, packetCapture, behaviour, api.Pcap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -67,14 +70,14 @@ func newHostPacketSource(filename string, interfaceName string,
|
|||||||
|
|
||||||
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
||||||
if m.config.mtls {
|
if m.config.mtls {
|
||||||
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour, ipdefrag, packets)
|
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.packetCapture, m.config.behaviour, ipdefrag, packets)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.setBPFFilter(pods)
|
m.setBPFFilter(pods)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
||||||
interfaceName string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
interfaceName string, packetCapture string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
||||||
|
|
||||||
relevantPids := m.getRelevantPids(procfs, pods)
|
relevantPids := m.getRelevantPids(procfs, pods)
|
||||||
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
|
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
|
||||||
@ -88,7 +91,7 @@ func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
|||||||
|
|
||||||
for pid, origin := range relevantPids {
|
for pid, origin := range relevantPids {
|
||||||
if _, ok := m.sources[pid]; !ok {
|
if _, ok := m.sources[pid]; !ok {
|
||||||
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour, origin)
|
source, err := newNetnsPacketSource(procfs, pid, interfaceName, packetCapture, behaviour, origin)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
go source.readPackets(ipdefrag, packets)
|
go source.readPackets(ipdefrag, packets)
|
||||||
@ -165,12 +168,12 @@ func (m *PacketSourceManager) Stats() string {
|
|||||||
result := ""
|
result := ""
|
||||||
|
|
||||||
for _, source := range m.sources {
|
for _, source := range m.sources {
|
||||||
stats, err := source.Stats()
|
packetsReceived, packetsDropped, err := source.Stats()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result = result + fmt.Sprintf("[%s: err:%s]", source.String(), err)
|
result = result + fmt.Sprintf("[%s: err:%s]", source.String(), err)
|
||||||
} else {
|
} else {
|
||||||
result = result + fmt.Sprintf("[%s: rec: %d dropped: %d]", source.String(), stats.PacketsReceived, stats.PacketsDropped)
|
result = result + fmt.Sprintf("[%s: rec: %d dropped: %d]", source.String(), packetsReceived, packetsDropped)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,21 +3,27 @@ package source
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/gopacket"
|
"github.com/google/gopacket"
|
||||||
"github.com/google/gopacket/ip4defrag"
|
"github.com/google/gopacket/ip4defrag"
|
||||||
"github.com/google/gopacket/layers"
|
"github.com/google/gopacket/layers"
|
||||||
"github.com/google/gopacket/pcap"
|
|
||||||
"github.com/up9inc/mizu/logger"
|
"github.com/up9inc/mizu/logger"
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
"github.com/up9inc/mizu/tap/dbgctl"
|
"github.com/up9inc/mizu/tap/dbgctl"
|
||||||
"github.com/up9inc/mizu/tap/diagnose"
|
"github.com/up9inc/mizu/tap/diagnose"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Handle interface {
|
||||||
|
NextPacket() (packet gopacket.Packet, err error)
|
||||||
|
SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool)
|
||||||
|
SetBPF(expr string) (err error)
|
||||||
|
LinkType() layers.LinkType
|
||||||
|
Stats() (packetsReceived uint, packetsDropped uint, err error)
|
||||||
|
Close() (err error)
|
||||||
|
}
|
||||||
|
|
||||||
type tcpPacketSource struct {
|
type tcpPacketSource struct {
|
||||||
source *gopacket.PacketSource
|
Handle Handle
|
||||||
handle *pcap.Handle
|
|
||||||
defragger *ip4defrag.IPv4Defragmenter
|
defragger *ip4defrag.IPv4Defragmenter
|
||||||
Behaviour *TcpPacketSourceBehaviour
|
Behaviour *TcpPacketSourceBehaviour
|
||||||
name string
|
name string
|
||||||
@ -26,6 +32,7 @@ type tcpPacketSource struct {
|
|||||||
|
|
||||||
type TcpPacketSourceBehaviour struct {
|
type TcpPacketSourceBehaviour struct {
|
||||||
SnapLength int
|
SnapLength int
|
||||||
|
TargetSizeMb int
|
||||||
Promisc bool
|
Promisc bool
|
||||||
Tstype string
|
Tstype string
|
||||||
DecoderName string
|
DecoderName string
|
||||||
@ -38,7 +45,7 @@ type TcpPacketInfo struct {
|
|||||||
Source *tcpPacketSource
|
Source *tcpPacketSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTcpPacketSource(name, filename string, interfaceName string,
|
func newTcpPacketSource(name, filename string, interfaceName string, packetCapture string,
|
||||||
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@ -49,56 +56,48 @@ func newTcpPacketSource(name, filename string, interfaceName string,
|
|||||||
Origin: origin,
|
Origin: origin,
|
||||||
}
|
}
|
||||||
|
|
||||||
if filename != "" {
|
switch packetCapture {
|
||||||
if result.handle, err = pcap.OpenOffline(filename); err != nil {
|
case "af_packet":
|
||||||
return result, fmt.Errorf("PCAP OpenOffline error: %v", err)
|
result.Handle, err = newAfpacketHandle(
|
||||||
}
|
interfaceName,
|
||||||
} else {
|
behaviour.TargetSizeMb,
|
||||||
// This is a little complicated because we want to allow all possible options
|
behaviour.SnapLength,
|
||||||
// for creating the packet capture handle... instead of all this you can
|
)
|
||||||
// just call pcap.OpenLive if you want a simple handle.
|
|
||||||
inactive, err := pcap.NewInactiveHandle(interfaceName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, fmt.Errorf("could not create: %v", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
defer inactive.CleanUp()
|
logger.Log.Infof("Using AF_PACKET socket as the capture source")
|
||||||
if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil {
|
default:
|
||||||
return result, fmt.Errorf("could not set snap length: %v", err)
|
result.Handle, err = newPcapHandle(
|
||||||
} else if err = inactive.SetPromisc(behaviour.Promisc); err != nil {
|
filename,
|
||||||
return result, fmt.Errorf("could not set promisc mode: %v", err)
|
interfaceName,
|
||||||
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
behaviour.SnapLength,
|
||||||
return result, fmt.Errorf("could not set timeout: %v", err)
|
behaviour.Promisc,
|
||||||
|
behaviour.Tstype,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
if behaviour.Tstype != "" {
|
logger.Log.Infof("Using libpcap as the capture source")
|
||||||
if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil {
|
|
||||||
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
|
||||||
} else if err := inactive.SetTimestampSource(t); err != nil {
|
|
||||||
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var decoder gopacket.Decoder
|
||||||
|
var ok bool
|
||||||
|
if behaviour.DecoderName == "" {
|
||||||
|
behaviour.DecoderName = result.Handle.LinkType().String()
|
||||||
}
|
}
|
||||||
if result.handle, err = inactive.Activate(); err != nil {
|
if decoder, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok {
|
||||||
return result, fmt.Errorf("PCAP Activate error: %v", err)
|
return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
result.Handle.SetDecoder(decoder, behaviour.Lazy, true)
|
||||||
|
|
||||||
if behaviour.BpfFilter != "" {
|
if behaviour.BpfFilter != "" {
|
||||||
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
|
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
|
||||||
if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil {
|
if err = result.setBPFFilter(behaviour.BpfFilter); err != nil {
|
||||||
return nil, fmt.Errorf("BPF filter error: %v", err)
|
return nil, fmt.Errorf("BPF filter error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var dec gopacket.Decoder
|
|
||||||
var ok bool
|
|
||||||
if behaviour.DecoderName == "" {
|
|
||||||
behaviour.DecoderName = result.handle.LinkType().String()
|
|
||||||
}
|
|
||||||
if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok {
|
|
||||||
return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName)
|
|
||||||
}
|
|
||||||
result.source = gopacket.NewPacketSource(result.handle, dec)
|
|
||||||
result.source.Lazy = behaviour.Lazy
|
|
||||||
result.source.NoCopy = true
|
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,17 +106,17 @@ func (source *tcpPacketSource) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
|
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
|
||||||
return source.handle.SetBPFFilter(expr)
|
return source.Handle.SetBPF(expr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (source *tcpPacketSource) close() {
|
func (source *tcpPacketSource) close() {
|
||||||
if source.handle != nil {
|
if source.Handle != nil {
|
||||||
source.handle.Close()
|
source.Handle.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (source *tcpPacketSource) Stats() (stat *pcap.Stats, err error) {
|
func (source *tcpPacketSource) Stats() (packetsReceived uint, packetsDropped uint, err error) {
|
||||||
return source.handle.Stats()
|
return source.Handle.Stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPacketInfo) {
|
func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPacketInfo) {
|
||||||
@ -127,7 +126,7 @@ func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPack
|
|||||||
logger.Log.Infof("Start reading packets from %v", source.name)
|
logger.Log.Infof("Start reading packets from %v", source.name)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
packet, err := source.source.NextPacket()
|
packet, err := source.Handle.NextPacket()
|
||||||
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
logger.Log.Infof("Got EOF while reading packets from %v", source.name)
|
logger.Log.Infof("Got EOF while reading packets from %v", source.name)
|
||||||
|
Loading…
Reference in New Issue
Block a user