Support both libpcap and AF_PACKET at the same time

This commit is contained in:
M. Mert Yildiran
2022-04-28 19:19:47 +03:00
parent ea2a86a720
commit 8039970116
4 changed files with 269 additions and 106 deletions

View File

@@ -46,6 +46,7 @@ var procfs = flag.String("procfs", "/proc", "The procfs directory, used when map
var iface = flag.String("i", "en0", "Interface to read packets from")
var fname = flag.String("r", "", "Filename to read from, overrides -i")
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
var targetSizeMb = flag.Int("target_size_mb", 8, "AF_PACKET target block size (MB)")
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
@@ -168,12 +169,13 @@ func initializePacketSources() error {
}
behaviour := source.TcpPacketSourceBehaviour{
SnapLength: *snaplen,
Promisc: *promisc,
Tstype: *tstype,
DecoderName: *decoder,
Lazy: *lazy,
BpfFilter: bpffilter,
SnapLength: *snaplen,
TargetSizeMb: *targetSizeMb,
Promisc: *promisc,
Tstype: *tstype,
DecoderName: *decoder,
Lazy: *lazy,
BpfFilter: bpffilter,
}
var err error

View File

@@ -0,0 +1,135 @@
package source
import (
"fmt"
"os"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"golang.org/x/net/bpf"
)
type afPacketHandle struct {
source gopacket.ZeroCopyPacketDataSource
capture *afpacket.TPacket
decoder gopacket.Decoder
decodeOptions gopacket.DecodeOptions
}
func (h *afPacketHandle) NextPacket() (packet gopacket.Packet, err error) {
var data []byte
var ci gopacket.CaptureInfo
data, ci, err = h.source.ZeroCopyReadPacketData()
packet = gopacket.NewPacket(data, h.decoder, h.decodeOptions)
m := packet.Metadata()
m.CaptureInfo = ci
m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
return
}
func (h *afPacketHandle) SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) {
h.decoder = decoder
h.decodeOptions = gopacket.DecodeOptions{Lazy: lazy, NoCopy: noCopy}
}
func (h *afPacketHandle) SetBPF(expr string) (err error) {
var pcapBPF []pcap.BPFInstruction
pcapBPF, err = pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr)
if err != nil {
return
}
bpfIns := []bpf.RawInstruction{}
for _, ins := range pcapBPF {
bpfIns2 := bpf.RawInstruction{
Op: ins.Code,
Jt: ins.Jt,
Jf: ins.Jf,
K: ins.K,
}
bpfIns = append(bpfIns, bpfIns2)
}
h.capture.SetBPF(bpfIns)
return
}
func (h *afPacketHandle) Close() {
h.capture.Close()
}
func newAfpacketHandle(device string, targetSizeMb int, snaplen int) (handle Handle, err error) {
snaplen -= 1
if snaplen < 0 {
snaplen = 0
}
szFrame, szBlock, numBlocks, err := afpacketComputeSize(targetSizeMb, snaplen, os.Getpagesize())
if err != nil {
return
}
var capture *afpacket.TPacket
capture, err = newAfpacket(device, szFrame, szBlock, numBlocks, false, pcap.BlockForever)
if err != nil {
return
}
handle = &afPacketHandle{
capture: capture,
source: gopacket.ZeroCopyPacketDataSource(capture),
}
return
}
func newAfpacket(device string, snaplen int, block_size int, num_blocks int,
useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) {
var h *afpacket.TPacket
var err error
if device == "any" {
h, err = afpacket.NewTPacket(
afpacket.OptFrameSize(snaplen),
afpacket.OptBlockSize(block_size),
afpacket.OptNumBlocks(num_blocks),
afpacket.OptAddVLANHeader(useVLAN),
afpacket.OptPollTimeout(timeout),
afpacket.SocketRaw,
afpacket.TPacketVersion3)
} else {
h, err = afpacket.NewTPacket(
afpacket.OptInterface(device),
afpacket.OptFrameSize(snaplen),
afpacket.OptBlockSize(block_size),
afpacket.OptNumBlocks(num_blocks),
afpacket.OptAddVLANHeader(useVLAN),
afpacket.OptPollTimeout(timeout),
afpacket.SocketRaw,
afpacket.TPacketVersion3)
}
return h, err
}
// afpacketComputeSize computes the block_size and the num_blocks in such a way that the
// allocated mmap buffer is close to but smaller than target_size_mb.
// The restriction is that the block_size must be divisible by both the
// frame size and page size.
func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) (
frameSize int, blockSize int, numBlocks int, err error) {
if snaplen < pageSize {
frameSize = pageSize / (pageSize / snaplen)
} else {
frameSize = (snaplen/pageSize + 1) * pageSize
}
// 128 is the default from the gopacket library so just use that
blockSize = frameSize * 128
numBlocks = (targetSizeMb * 1024 * 1024) / blockSize
if numBlocks == 0 {
return 0, 0, 0, fmt.Errorf("Interface buffersize is too small")
}
return frameSize, blockSize, numBlocks, nil
}

85
tap/source/handle_pcap.go Normal file
View File

@@ -0,0 +1,85 @@
package source
import (
"fmt"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
)
type pcapHandle struct {
source *gopacket.PacketSource
capture *pcap.Handle
decoder gopacket.Decoder
decodeOptions gopacket.DecodeOptions
}
func (h *pcapHandle) NextPacket() (packet gopacket.Packet, err error) {
return h.source.NextPacket()
}
func (h *pcapHandle) SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) {
h.source = gopacket.NewPacketSource(h.capture, decoder)
h.source.Lazy = lazy
h.source.NoCopy = noCopy
}
func (h *pcapHandle) SetBPF(expr string) (err error) {
return h.capture.SetBPFFilter(expr)
}
func (h *pcapHandle) Close() {
h.capture.Close()
}
func newPcapHandle(filename string, device string, snaplen int, promisc bool, tstype string) (handle Handle, err error) {
var capture *pcap.Handle
if filename != "" {
if capture, err = pcap.OpenOffline(filename); err != nil {
err = fmt.Errorf("PCAP OpenOffline error: %v", err)
return
}
} else {
// This is a little complicated because we want to allow all possible options
// for creating the packet capture handle... instead of all this you can
// just call pcap.OpenLive if you want a simple handle.
var inactive *pcap.InactiveHandle
inactive, err = pcap.NewInactiveHandle(device)
if err != nil {
err = fmt.Errorf("could not create: %v", err)
return
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(snaplen); err != nil {
err = fmt.Errorf("could not set snap length: %v", err)
return
} else if err = inactive.SetPromisc(promisc); err != nil {
err = fmt.Errorf("could not set promisc mode: %v", err)
return
} else if err = inactive.SetTimeout(time.Second); err != nil {
err = fmt.Errorf("could not set timeout: %v", err)
return
}
if tstype != "" {
var t pcap.TimestampSource
if t, err = pcap.TimestampSourceFromString(tstype); err != nil {
err = fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
return
} else if err = inactive.SetTimestampSource(t); err != nil {
err = fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
return
}
}
if capture, err = inactive.Activate(); err != nil {
err = fmt.Errorf("PCAP Activate error: %v", err)
return
}
}
handle = &pcapHandle{
capture: capture,
}
return
}

View File

@@ -3,23 +3,24 @@ package source
import (
"fmt"
"io"
"os"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"golang.org/x/net/bpf"
)
type Handle interface {
NextPacket() (packet gopacket.Packet, err error)
SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool)
SetBPF(expr string) (err error)
Close()
}
type tcpPacketSource struct {
source gopacket.ZeroCopyPacketDataSource
Handle *afpacket.TPacket
Handle Handle
defragger *ip4defrag.IPv4Defragmenter
Behaviour *TcpPacketSourceBehaviour
name string
@@ -27,12 +28,13 @@ type tcpPacketSource struct {
}
type TcpPacketSourceBehaviour struct {
SnapLength int
Promisc bool
Tstype string
DecoderName string
Lazy bool
BpfFilter string
SnapLength int
TargetSizeMb int
Promisc bool
Tstype string
DecoderName string
Lazy bool
BpfFilter string
}
type TcpPacketInfo struct {
@@ -40,61 +42,6 @@ type TcpPacketInfo struct {
Source *tcpPacketSource
}
func newAfpacketHandle(device string, snaplen int, block_size int, num_blocks int,
useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) {
var h *afpacket.TPacket
var err error
if device == "any" {
h, err = afpacket.NewTPacket(
afpacket.OptFrameSize(snaplen),
afpacket.OptBlockSize(block_size),
afpacket.OptNumBlocks(num_blocks),
afpacket.OptAddVLANHeader(useVLAN),
afpacket.OptPollTimeout(timeout),
afpacket.SocketRaw,
afpacket.TPacketVersion3)
} else {
h, err = afpacket.NewTPacket(
afpacket.OptInterface(device),
afpacket.OptFrameSize(snaplen),
afpacket.OptBlockSize(block_size),
afpacket.OptNumBlocks(num_blocks),
afpacket.OptAddVLANHeader(useVLAN),
afpacket.OptPollTimeout(timeout),
afpacket.SocketRaw,
afpacket.TPacketVersion3)
}
return h, err
}
// afpacketComputeSize computes the block_size and the num_blocks in such a way that the
// allocated mmap buffer is close to but smaller than target_size_mb.
// The restriction is that the block_size must be divisible by both the
// frame size and page size.
func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) (
frameSize int, blockSize int, numBlocks int, err error) {
fmt.Printf("[afpacketComputeSize] targetSizeMb: %v snaplen: %v pageSize: %v\n", targetSizeMb, snaplen, pageSize)
if snaplen < pageSize {
frameSize = pageSize / (pageSize / snaplen)
} else {
frameSize = (snaplen/pageSize + 1) * pageSize
}
// 128 is the default from the gopacket library so just use that
blockSize = frameSize * 128
numBlocks = (targetSizeMb * 1024 * 1024) / blockSize
if numBlocks == 0 {
return 0, 0, 0, fmt.Errorf("Interface buffersize is too small")
}
return frameSize, blockSize, numBlocks, nil
}
func newTcpPacketSource(name, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
var err error
@@ -106,14 +53,32 @@ func newTcpPacketSource(name, filename string, interfaceName string,
Origin: origin,
}
szFrame, szBlock, numBlocks, err := afpacketComputeSize(8, 65535, os.Getpagesize())
result.Handle, err = newAfpacketHandle(
interfaceName,
behaviour.TargetSizeMb,
behaviour.SnapLength,
)
if err != nil {
panic(err)
}
result.Handle, err = newAfpacketHandle(interfaceName, szFrame, szBlock, numBlocks, false, pcap.BlockForever)
if err != nil {
panic(err)
logger.Log.Warning(err)
result.Handle, err = newPcapHandle(
filename,
interfaceName,
behaviour.SnapLength,
behaviour.Promisc,
behaviour.Tstype,
)
if err != nil {
return nil, err
} else {
logger.Log.Infof("Using libpcap as the capture source")
}
} else {
logger.Log.Infof("Using AF_PACKET socket as the capture source")
}
decoder := gopacket.DecodersByLayerName[fmt.Sprintf("%s", layers.LinkTypeEthernet)]
result.Handle.SetDecoder(decoder, behaviour.Lazy, true)
if behaviour.BpfFilter != "" {
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
if err = result.setBPFFilter(behaviour.BpfFilter); err != nil {
@@ -121,8 +86,6 @@ func newTcpPacketSource(name, filename string, interfaceName string,
}
}
result.source = gopacket.ZeroCopyPacketDataSource(result.Handle)
return result, nil
}
@@ -131,21 +94,7 @@ func (source *tcpPacketSource) String() string {
}
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
pcapBPF, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr)
if err != nil {
panic(err)
}
bpfIns := []bpf.RawInstruction{}
for _, ins := range pcapBPF {
bpfIns2 := bpf.RawInstruction{
Op: ins.Code,
Jt: ins.Jt,
Jf: ins.Jf,
K: ins.K,
}
bpfIns = append(bpfIns, bpfIns2)
}
return source.Handle.SetBPF(bpfIns)
return source.Handle.SetBPF(expr)
}
func (source *tcpPacketSource) close() {
@@ -158,15 +107,7 @@ func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPack
logger.Log.Infof("Start reading packets from %v", source.name)
for {
data, ci, err := source.source.ZeroCopyReadPacketData()
decoder := gopacket.DecodersByLayerName[fmt.Sprintf("%s", layers.LinkTypeEthernet)]
decodeOptions := gopacket.DecodeOptions{Lazy: false, NoCopy: true}
packet := gopacket.NewPacket(data, decoder, decodeOptions)
m := packet.Metadata()
m.CaptureInfo = ci
m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
packet, err := source.Handle.NextPacket()
if err == io.EOF {
logger.Log.Infof("Got EOF while reading packets from %v", source.name)