diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index f84b4a9a8..ee01658c6 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -46,6 +46,7 @@ var procfs = flag.String("procfs", "/proc", "The procfs directory, used when map var iface = flag.String("i", "en0", "Interface to read packets from") var fname = flag.String("r", "", "Filename to read from, overrides -i") var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet") +var targetSizeMb = flag.Int("target_size_mb", 8, "AF_PACKET target block size (MB)") var tstype = flag.String("timestamp_type", "", "Type of timestamps to use") var promisc = flag.Bool("promisc", true, "Set promiscuous mode") var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data") @@ -168,12 +169,13 @@ func initializePacketSources() error { } behaviour := source.TcpPacketSourceBehaviour{ - SnapLength: *snaplen, - Promisc: *promisc, - Tstype: *tstype, - DecoderName: *decoder, - Lazy: *lazy, - BpfFilter: bpffilter, + SnapLength: *snaplen, + TargetSizeMb: *targetSizeMb, + Promisc: *promisc, + Tstype: *tstype, + DecoderName: *decoder, + Lazy: *lazy, + BpfFilter: bpffilter, } var err error diff --git a/tap/source/handle_af_packet.go b/tap/source/handle_af_packet.go new file mode 100644 index 000000000..11e29b9d4 --- /dev/null +++ b/tap/source/handle_af_packet.go @@ -0,0 +1,135 @@ +package source + +import ( + "fmt" + "os" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/afpacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "golang.org/x/net/bpf" +) + +type afPacketHandle struct { + source gopacket.ZeroCopyPacketDataSource + capture *afpacket.TPacket + decoder gopacket.Decoder + decodeOptions gopacket.DecodeOptions +} + +func (h *afPacketHandle) NextPacket() (packet gopacket.Packet, err error) { + var data []byte + var ci gopacket.CaptureInfo + data, ci, err = h.source.ZeroCopyReadPacketData() + + packet = gopacket.NewPacket(data, h.decoder, h.decodeOptions) + m := packet.Metadata() + m.CaptureInfo = ci + m.Truncated = m.Truncated || ci.CaptureLength < ci.Length + return +} + +func (h *afPacketHandle) SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) { + h.decoder = decoder + h.decodeOptions = gopacket.DecodeOptions{Lazy: lazy, NoCopy: noCopy} +} + +func (h *afPacketHandle) SetBPF(expr string) (err error) { + var pcapBPF []pcap.BPFInstruction + pcapBPF, err = pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr) + if err != nil { + return + } + bpfIns := []bpf.RawInstruction{} + for _, ins := range pcapBPF { + bpfIns2 := bpf.RawInstruction{ + Op: ins.Code, + Jt: ins.Jt, + Jf: ins.Jf, + K: ins.K, + } + bpfIns = append(bpfIns, bpfIns2) + } + h.capture.SetBPF(bpfIns) + return +} + +func (h *afPacketHandle) Close() { + h.capture.Close() +} + +func newAfpacketHandle(device string, targetSizeMb int, snaplen int) (handle Handle, err error) { + snaplen -= 1 + if snaplen < 0 { + snaplen = 0 + } + szFrame, szBlock, numBlocks, err := afpacketComputeSize(targetSizeMb, snaplen, os.Getpagesize()) + if err != nil { + return + } + var capture *afpacket.TPacket + capture, err = newAfpacket(device, szFrame, szBlock, numBlocks, false, pcap.BlockForever) + if err != nil { + return + } + handle = &afPacketHandle{ + capture: capture, + source: gopacket.ZeroCopyPacketDataSource(capture), + } + return +} + +func newAfpacket(device string, snaplen int, block_size int, num_blocks int, + useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) { + + var h *afpacket.TPacket + var err error + + if device == "any" { + h, err = afpacket.NewTPacket( + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } else { + h, err = afpacket.NewTPacket( + afpacket.OptInterface(device), + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } + return h, err +} + +// afpacketComputeSize computes the block_size and the num_blocks in such a way that the +// allocated mmap buffer is close to but smaller than target_size_mb. +// The restriction is that the block_size must be divisible by both the +// frame size and page size. +func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) ( + frameSize int, blockSize int, numBlocks int, err error) { + + if snaplen < pageSize { + frameSize = pageSize / (pageSize / snaplen) + } else { + frameSize = (snaplen/pageSize + 1) * pageSize + } + + // 128 is the default from the gopacket library so just use that + blockSize = frameSize * 128 + numBlocks = (targetSizeMb * 1024 * 1024) / blockSize + + if numBlocks == 0 { + return 0, 0, 0, fmt.Errorf("Interface buffersize is too small") + } + + return frameSize, blockSize, numBlocks, nil +} diff --git a/tap/source/handle_pcap.go b/tap/source/handle_pcap.go new file mode 100644 index 000000000..607c61d60 --- /dev/null +++ b/tap/source/handle_pcap.go @@ -0,0 +1,85 @@ +package source + +import ( + "fmt" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +type pcapHandle struct { + source *gopacket.PacketSource + capture *pcap.Handle + decoder gopacket.Decoder + decodeOptions gopacket.DecodeOptions +} + +func (h *pcapHandle) NextPacket() (packet gopacket.Packet, err error) { + return h.source.NextPacket() +} +func (h *pcapHandle) SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) { + h.source = gopacket.NewPacketSource(h.capture, decoder) + h.source.Lazy = lazy + h.source.NoCopy = noCopy +} + +func (h *pcapHandle) SetBPF(expr string) (err error) { + return h.capture.SetBPFFilter(expr) +} + +func (h *pcapHandle) Close() { + h.capture.Close() +} + +func newPcapHandle(filename string, device string, snaplen int, promisc bool, tstype string) (handle Handle, err error) { + var capture *pcap.Handle + + if filename != "" { + if capture, err = pcap.OpenOffline(filename); err != nil { + err = fmt.Errorf("PCAP OpenOffline error: %v", err) + return + } + } else { + // This is a little complicated because we want to allow all possible options + // for creating the packet capture handle... instead of all this you can + // just call pcap.OpenLive if you want a simple handle. + var inactive *pcap.InactiveHandle + inactive, err = pcap.NewInactiveHandle(device) + if err != nil { + err = fmt.Errorf("could not create: %v", err) + return + } + defer inactive.CleanUp() + if err = inactive.SetSnapLen(snaplen); err != nil { + err = fmt.Errorf("could not set snap length: %v", err) + return + } else if err = inactive.SetPromisc(promisc); err != nil { + err = fmt.Errorf("could not set promisc mode: %v", err) + return + } else if err = inactive.SetTimeout(time.Second); err != nil { + err = fmt.Errorf("could not set timeout: %v", err) + return + } + if tstype != "" { + var t pcap.TimestampSource + if t, err = pcap.TimestampSourceFromString(tstype); err != nil { + err = fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) + return + } else if err = inactive.SetTimestampSource(t); err != nil { + err = fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) + return + } + } + if capture, err = inactive.Activate(); err != nil { + err = fmt.Errorf("PCAP Activate error: %v", err) + return + } + } + + handle = &pcapHandle{ + capture: capture, + } + + return +} diff --git a/tap/source/tcp_packet_source.go b/tap/source/tcp_packet_source.go index 5745058e7..c81b776ff 100644 --- a/tap/source/tcp_packet_source.go +++ b/tap/source/tcp_packet_source.go @@ -3,23 +3,24 @@ package source import ( "fmt" "io" - "os" - "time" "github.com/google/gopacket" - "github.com/google/gopacket/afpacket" "github.com/google/gopacket/ip4defrag" "github.com/google/gopacket/layers" - "github.com/google/gopacket/pcap" "github.com/up9inc/mizu/logger" "github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/diagnose" - "golang.org/x/net/bpf" ) +type Handle interface { + NextPacket() (packet gopacket.Packet, err error) + SetDecoder(decoder gopacket.Decoder, lazy bool, noCopy bool) + SetBPF(expr string) (err error) + Close() +} + type tcpPacketSource struct { - source gopacket.ZeroCopyPacketDataSource - Handle *afpacket.TPacket + Handle Handle defragger *ip4defrag.IPv4Defragmenter Behaviour *TcpPacketSourceBehaviour name string @@ -27,12 +28,13 @@ type tcpPacketSource struct { } type TcpPacketSourceBehaviour struct { - SnapLength int - Promisc bool - Tstype string - DecoderName string - Lazy bool - BpfFilter string + SnapLength int + TargetSizeMb int + Promisc bool + Tstype string + DecoderName string + Lazy bool + BpfFilter string } type TcpPacketInfo struct { @@ -40,61 +42,6 @@ type TcpPacketInfo struct { Source *tcpPacketSource } -func newAfpacketHandle(device string, snaplen int, block_size int, num_blocks int, - useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) { - - var h *afpacket.TPacket - var err error - - if device == "any" { - h, err = afpacket.NewTPacket( - afpacket.OptFrameSize(snaplen), - afpacket.OptBlockSize(block_size), - afpacket.OptNumBlocks(num_blocks), - afpacket.OptAddVLANHeader(useVLAN), - afpacket.OptPollTimeout(timeout), - afpacket.SocketRaw, - afpacket.TPacketVersion3) - } else { - h, err = afpacket.NewTPacket( - afpacket.OptInterface(device), - afpacket.OptFrameSize(snaplen), - afpacket.OptBlockSize(block_size), - afpacket.OptNumBlocks(num_blocks), - afpacket.OptAddVLANHeader(useVLAN), - afpacket.OptPollTimeout(timeout), - afpacket.SocketRaw, - afpacket.TPacketVersion3) - } - return h, err -} - -// afpacketComputeSize computes the block_size and the num_blocks in such a way that the -// allocated mmap buffer is close to but smaller than target_size_mb. -// The restriction is that the block_size must be divisible by both the -// frame size and page size. -func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) ( - frameSize int, blockSize int, numBlocks int, err error) { - - fmt.Printf("[afpacketComputeSize] targetSizeMb: %v snaplen: %v pageSize: %v\n", targetSizeMb, snaplen, pageSize) - - if snaplen < pageSize { - frameSize = pageSize / (pageSize / snaplen) - } else { - frameSize = (snaplen/pageSize + 1) * pageSize - } - - // 128 is the default from the gopacket library so just use that - blockSize = frameSize * 128 - numBlocks = (targetSizeMb * 1024 * 1024) / blockSize - - if numBlocks == 0 { - return 0, 0, 0, fmt.Errorf("Interface buffersize is too small") - } - - return frameSize, blockSize, numBlocks, nil -} - func newTcpPacketSource(name, filename string, interfaceName string, behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) { var err error @@ -106,14 +53,32 @@ func newTcpPacketSource(name, filename string, interfaceName string, Origin: origin, } - szFrame, szBlock, numBlocks, err := afpacketComputeSize(8, 65535, os.Getpagesize()) + result.Handle, err = newAfpacketHandle( + interfaceName, + behaviour.TargetSizeMb, + behaviour.SnapLength, + ) if err != nil { - panic(err) - } - result.Handle, err = newAfpacketHandle(interfaceName, szFrame, szBlock, numBlocks, false, pcap.BlockForever) - if err != nil { - panic(err) + logger.Log.Warning(err) + result.Handle, err = newPcapHandle( + filename, + interfaceName, + behaviour.SnapLength, + behaviour.Promisc, + behaviour.Tstype, + ) + if err != nil { + return nil, err + } else { + logger.Log.Infof("Using libpcap as the capture source") + } + } else { + logger.Log.Infof("Using AF_PACKET socket as the capture source") } + + decoder := gopacket.DecodersByLayerName[fmt.Sprintf("%s", layers.LinkTypeEthernet)] + result.Handle.SetDecoder(decoder, behaviour.Lazy, true) + if behaviour.BpfFilter != "" { logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter) if err = result.setBPFFilter(behaviour.BpfFilter); err != nil { @@ -121,8 +86,6 @@ func newTcpPacketSource(name, filename string, interfaceName string, } } - result.source = gopacket.ZeroCopyPacketDataSource(result.Handle) - return result, nil } @@ -131,21 +94,7 @@ func (source *tcpPacketSource) String() string { } func (source *tcpPacketSource) setBPFFilter(expr string) (err error) { - pcapBPF, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr) - if err != nil { - panic(err) - } - bpfIns := []bpf.RawInstruction{} - for _, ins := range pcapBPF { - bpfIns2 := bpf.RawInstruction{ - Op: ins.Code, - Jt: ins.Jt, - Jf: ins.Jf, - K: ins.K, - } - bpfIns = append(bpfIns, bpfIns2) - } - return source.Handle.SetBPF(bpfIns) + return source.Handle.SetBPF(expr) } func (source *tcpPacketSource) close() { @@ -158,15 +107,7 @@ func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPack logger.Log.Infof("Start reading packets from %v", source.name) for { - data, ci, err := source.source.ZeroCopyReadPacketData() - - decoder := gopacket.DecodersByLayerName[fmt.Sprintf("%s", layers.LinkTypeEthernet)] - decodeOptions := gopacket.DecodeOptions{Lazy: false, NoCopy: true} - - packet := gopacket.NewPacket(data, decoder, decodeOptions) - m := packet.Metadata() - m.CaptureInfo = ci - m.Truncated = m.Truncated || ci.CaptureLength < ci.Length + packet, err := source.Handle.NextPacket() if err == io.EOF { logger.Log.Infof("Got EOF while reading packets from %v", source.name)