From a4b942ac1e971efbb66869210cedbc7e3e26fd3a Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Thu, 28 Apr 2022 18:13:44 +0300 Subject: [PATCH] Add `AF_PACKET` support --- Dockerfile | 2 +- tap/passive_tapper.go | 2 +- tap/settings.go | 4 +- tap/source/tcp_packet_source.go | 143 +++++++++++++++++++++----------- 4 files changed, 99 insertions(+), 52 deletions(-) diff --git a/Dockerfile b/Dockerfile index c734d16ec..0c74b3979 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ RUN npm run build ### Base builder image for native builds architecture FROM golang:1.17-alpine AS builder-native-base ENV CGO_ENABLED=1 GOOS=linux -RUN apk add --no-cache libpcap-dev g++ perl-utils +RUN apk add --no-cache libpcap-dev g++ perl-utils linux-headers ### Intermediate builder image for x86-64 to x86-64 native builds diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index e310a931e..f84b4a9a8 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -187,7 +187,7 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI diagnose.InitializeErrorsMap(*debug, *verbose, *quiet) diagnose.InitializeTapperInternalStats() - mainPacketInputChan = make(chan source.TcpPacketInfo) + mainPacketInputChan = make(chan source.TcpPacketInfo, 1000) if err := initializePacketSources(); err != nil { logger.Log.Fatal(err) diff --git a/tap/settings.go b/tap/settings.go index 32b0e988f..7ddbdddad 100644 --- a/tap/settings.go +++ b/tap/settings.go @@ -14,8 +14,8 @@ const ( MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL" MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL" MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION" - MaxBufferedPagesTotalDefaultValue = 5000 - MaxBufferedPagesPerConnectionDefaultValue = 5000 + MaxBufferedPagesTotalDefaultValue = 0 + MaxBufferedPagesPerConnectionDefaultValue = 0 TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS" TcpStreamChannelTimeoutMsDefaultValue = 10000 CloseTimedoutTcpChannelsIntervalMsEnvVarName = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS" diff --git a/tap/source/tcp_packet_source.go b/tap/source/tcp_packet_source.go index 315b69d54..5745058e7 100644 --- a/tap/source/tcp_packet_source.go +++ b/tap/source/tcp_packet_source.go @@ -3,20 +3,23 @@ package source import ( "fmt" "io" + "os" "time" "github.com/google/gopacket" + "github.com/google/gopacket/afpacket" "github.com/google/gopacket/ip4defrag" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/up9inc/mizu/logger" "github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/diagnose" + "golang.org/x/net/bpf" ) type tcpPacketSource struct { - source *gopacket.PacketSource - handle *pcap.Handle + source gopacket.ZeroCopyPacketDataSource + Handle *afpacket.TPacket defragger *ip4defrag.IPv4Defragmenter Behaviour *TcpPacketSourceBehaviour name string @@ -37,6 +40,61 @@ type TcpPacketInfo struct { Source *tcpPacketSource } +func newAfpacketHandle(device string, snaplen int, block_size int, num_blocks int, + useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) { + + var h *afpacket.TPacket + var err error + + if device == "any" { + h, err = afpacket.NewTPacket( + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } else { + h, err = afpacket.NewTPacket( + afpacket.OptInterface(device), + afpacket.OptFrameSize(snaplen), + afpacket.OptBlockSize(block_size), + afpacket.OptNumBlocks(num_blocks), + afpacket.OptAddVLANHeader(useVLAN), + afpacket.OptPollTimeout(timeout), + afpacket.SocketRaw, + afpacket.TPacketVersion3) + } + return h, err +} + +// afpacketComputeSize computes the block_size and the num_blocks in such a way that the +// allocated mmap buffer is close to but smaller than target_size_mb. +// The restriction is that the block_size must be divisible by both the +// frame size and page size. +func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) ( + frameSize int, blockSize int, numBlocks int, err error) { + + fmt.Printf("[afpacketComputeSize] targetSizeMb: %v snaplen: %v pageSize: %v\n", targetSizeMb, snaplen, pageSize) + + if snaplen < pageSize { + frameSize = pageSize / (pageSize / snaplen) + } else { + frameSize = (snaplen/pageSize + 1) * pageSize + } + + // 128 is the default from the gopacket library so just use that + blockSize = frameSize * 128 + numBlocks = (targetSizeMb * 1024 * 1024) / blockSize + + if numBlocks == 0 { + return 0, 0, 0, fmt.Errorf("Interface buffersize is too small") + } + + return frameSize, blockSize, numBlocks, nil +} + func newTcpPacketSource(name, filename string, interfaceName string, behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) { var err error @@ -48,55 +106,22 @@ func newTcpPacketSource(name, filename string, interfaceName string, Origin: origin, } - if filename != "" { - if result.handle, err = pcap.OpenOffline(filename); err != nil { - return result, fmt.Errorf("PCAP OpenOffline error: %v", err) - } - } else { - // This is a little complicated because we want to allow all possible options - // for creating the packet capture handle... instead of all this you can - // just call pcap.OpenLive if you want a simple handle. - inactive, err := pcap.NewInactiveHandle(interfaceName) - if err != nil { - return result, fmt.Errorf("could not create: %v", err) - } - defer inactive.CleanUp() - if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil { - return result, fmt.Errorf("could not set snap length: %v", err) - } else if err = inactive.SetPromisc(behaviour.Promisc); err != nil { - return result, fmt.Errorf("could not set promisc mode: %v", err) - } else if err = inactive.SetTimeout(time.Second); err != nil { - return result, fmt.Errorf("could not set timeout: %v", err) - } - if behaviour.Tstype != "" { - if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil { - return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) - } else if err := inactive.SetTimestampSource(t); err != nil { - return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) - } - } - if result.handle, err = inactive.Activate(); err != nil { - return result, fmt.Errorf("PCAP Activate error: %v", err) - } + szFrame, szBlock, numBlocks, err := afpacketComputeSize(8, 65535, os.Getpagesize()) + if err != nil { + panic(err) + } + result.Handle, err = newAfpacketHandle(interfaceName, szFrame, szBlock, numBlocks, false, pcap.BlockForever) + if err != nil { + panic(err) } if behaviour.BpfFilter != "" { logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter) - if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil { + if err = result.setBPFFilter(behaviour.BpfFilter); err != nil { return nil, fmt.Errorf("BPF filter error: %v", err) } } - var dec gopacket.Decoder - var ok bool - if behaviour.DecoderName == "" { - behaviour.DecoderName = result.handle.LinkType().String() - } - if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok { - return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName) - } - result.source = gopacket.NewPacketSource(result.handle, dec) - result.source.Lazy = behaviour.Lazy - result.source.NoCopy = true + result.source = gopacket.ZeroCopyPacketDataSource(result.Handle) return result, nil } @@ -106,12 +131,26 @@ func (source *tcpPacketSource) String() string { } func (source *tcpPacketSource) setBPFFilter(expr string) (err error) { - return source.handle.SetBPFFilter(expr) + pcapBPF, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr) + if err != nil { + panic(err) + } + bpfIns := []bpf.RawInstruction{} + for _, ins := range pcapBPF { + bpfIns2 := bpf.RawInstruction{ + Op: ins.Code, + Jt: ins.Jt, + Jf: ins.Jf, + K: ins.K, + } + bpfIns = append(bpfIns, bpfIns2) + } + return source.Handle.SetBPF(bpfIns) } func (source *tcpPacketSource) close() { - if source.handle != nil { - source.handle.Close() + if source.Handle != nil { + source.Handle.Close() } } @@ -119,7 +158,15 @@ func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPack logger.Log.Infof("Start reading packets from %v", source.name) for { - packet, err := source.source.NextPacket() + data, ci, err := source.source.ZeroCopyReadPacketData() + + decoder := gopacket.DecodersByLayerName[fmt.Sprintf("%s", layers.LinkTypeEthernet)] + decodeOptions := gopacket.DecodeOptions{Lazy: false, NoCopy: true} + + packet := gopacket.NewPacket(data, decoder, decodeOptions) + m := packet.Metadata() + m.CaptureInfo = ci + m.Truncated = m.Truncated || ci.CaptureLength < ci.Length if err == io.EOF { logger.Log.Infof("Got EOF while reading packets from %v", source.name)