mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-21 09:53:46 +00:00
Add AF_PACKET
support
This commit is contained in:
parent
d3e6a69d82
commit
a4b942ac1e
@ -25,7 +25,7 @@ RUN npm run build
|
|||||||
### Base builder image for native builds architecture
|
### Base builder image for native builds architecture
|
||||||
FROM golang:1.17-alpine AS builder-native-base
|
FROM golang:1.17-alpine AS builder-native-base
|
||||||
ENV CGO_ENABLED=1 GOOS=linux
|
ENV CGO_ENABLED=1 GOOS=linux
|
||||||
RUN apk add --no-cache libpcap-dev g++ perl-utils
|
RUN apk add --no-cache libpcap-dev g++ perl-utils linux-headers
|
||||||
|
|
||||||
|
|
||||||
### Intermediate builder image for x86-64 to x86-64 native builds
|
### Intermediate builder image for x86-64 to x86-64 native builds
|
||||||
|
@ -187,7 +187,7 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
|
|||||||
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
||||||
diagnose.InitializeTapperInternalStats()
|
diagnose.InitializeTapperInternalStats()
|
||||||
|
|
||||||
mainPacketInputChan = make(chan source.TcpPacketInfo)
|
mainPacketInputChan = make(chan source.TcpPacketInfo, 1000)
|
||||||
|
|
||||||
if err := initializePacketSources(); err != nil {
|
if err := initializePacketSources(); err != nil {
|
||||||
logger.Log.Fatal(err)
|
logger.Log.Fatal(err)
|
||||||
|
@ -14,8 +14,8 @@ const (
|
|||||||
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
|
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
|
||||||
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
|
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
|
||||||
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
|
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
|
||||||
MaxBufferedPagesTotalDefaultValue = 5000
|
MaxBufferedPagesTotalDefaultValue = 0
|
||||||
MaxBufferedPagesPerConnectionDefaultValue = 5000
|
MaxBufferedPagesPerConnectionDefaultValue = 0
|
||||||
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
|
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
|
||||||
TcpStreamChannelTimeoutMsDefaultValue = 10000
|
TcpStreamChannelTimeoutMsDefaultValue = 10000
|
||||||
CloseTimedoutTcpChannelsIntervalMsEnvVarName = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS"
|
CloseTimedoutTcpChannelsIntervalMsEnvVarName = "CLOSE_TIMEDOUT_TCP_STREAM_CHANNELS_INTERVAL_MS"
|
||||||
|
@ -3,20 +3,23 @@ package source
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/gopacket"
|
"github.com/google/gopacket"
|
||||||
|
"github.com/google/gopacket/afpacket"
|
||||||
"github.com/google/gopacket/ip4defrag"
|
"github.com/google/gopacket/ip4defrag"
|
||||||
"github.com/google/gopacket/layers"
|
"github.com/google/gopacket/layers"
|
||||||
"github.com/google/gopacket/pcap"
|
"github.com/google/gopacket/pcap"
|
||||||
"github.com/up9inc/mizu/logger"
|
"github.com/up9inc/mizu/logger"
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
"github.com/up9inc/mizu/tap/diagnose"
|
"github.com/up9inc/mizu/tap/diagnose"
|
||||||
|
"golang.org/x/net/bpf"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tcpPacketSource struct {
|
type tcpPacketSource struct {
|
||||||
source *gopacket.PacketSource
|
source gopacket.ZeroCopyPacketDataSource
|
||||||
handle *pcap.Handle
|
Handle *afpacket.TPacket
|
||||||
defragger *ip4defrag.IPv4Defragmenter
|
defragger *ip4defrag.IPv4Defragmenter
|
||||||
Behaviour *TcpPacketSourceBehaviour
|
Behaviour *TcpPacketSourceBehaviour
|
||||||
name string
|
name string
|
||||||
@ -37,6 +40,61 @@ type TcpPacketInfo struct {
|
|||||||
Source *tcpPacketSource
|
Source *tcpPacketSource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newAfpacketHandle(device string, snaplen int, block_size int, num_blocks int,
|
||||||
|
useVLAN bool, timeout time.Duration) (*afpacket.TPacket, error) {
|
||||||
|
|
||||||
|
var h *afpacket.TPacket
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if device == "any" {
|
||||||
|
h, err = afpacket.NewTPacket(
|
||||||
|
afpacket.OptFrameSize(snaplen),
|
||||||
|
afpacket.OptBlockSize(block_size),
|
||||||
|
afpacket.OptNumBlocks(num_blocks),
|
||||||
|
afpacket.OptAddVLANHeader(useVLAN),
|
||||||
|
afpacket.OptPollTimeout(timeout),
|
||||||
|
afpacket.SocketRaw,
|
||||||
|
afpacket.TPacketVersion3)
|
||||||
|
} else {
|
||||||
|
h, err = afpacket.NewTPacket(
|
||||||
|
afpacket.OptInterface(device),
|
||||||
|
afpacket.OptFrameSize(snaplen),
|
||||||
|
afpacket.OptBlockSize(block_size),
|
||||||
|
afpacket.OptNumBlocks(num_blocks),
|
||||||
|
afpacket.OptAddVLANHeader(useVLAN),
|
||||||
|
afpacket.OptPollTimeout(timeout),
|
||||||
|
afpacket.SocketRaw,
|
||||||
|
afpacket.TPacketVersion3)
|
||||||
|
}
|
||||||
|
return h, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// afpacketComputeSize computes the block_size and the num_blocks in such a way that the
|
||||||
|
// allocated mmap buffer is close to but smaller than target_size_mb.
|
||||||
|
// The restriction is that the block_size must be divisible by both the
|
||||||
|
// frame size and page size.
|
||||||
|
func afpacketComputeSize(targetSizeMb int, snaplen int, pageSize int) (
|
||||||
|
frameSize int, blockSize int, numBlocks int, err error) {
|
||||||
|
|
||||||
|
fmt.Printf("[afpacketComputeSize] targetSizeMb: %v snaplen: %v pageSize: %v\n", targetSizeMb, snaplen, pageSize)
|
||||||
|
|
||||||
|
if snaplen < pageSize {
|
||||||
|
frameSize = pageSize / (pageSize / snaplen)
|
||||||
|
} else {
|
||||||
|
frameSize = (snaplen/pageSize + 1) * pageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// 128 is the default from the gopacket library so just use that
|
||||||
|
blockSize = frameSize * 128
|
||||||
|
numBlocks = (targetSizeMb * 1024 * 1024) / blockSize
|
||||||
|
|
||||||
|
if numBlocks == 0 {
|
||||||
|
return 0, 0, 0, fmt.Errorf("Interface buffersize is too small")
|
||||||
|
}
|
||||||
|
|
||||||
|
return frameSize, blockSize, numBlocks, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newTcpPacketSource(name, filename string, interfaceName string,
|
func newTcpPacketSource(name, filename string, interfaceName string,
|
||||||
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
behaviour TcpPacketSourceBehaviour, origin api.Capture) (*tcpPacketSource, error) {
|
||||||
var err error
|
var err error
|
||||||
@ -48,55 +106,22 @@ func newTcpPacketSource(name, filename string, interfaceName string,
|
|||||||
Origin: origin,
|
Origin: origin,
|
||||||
}
|
}
|
||||||
|
|
||||||
if filename != "" {
|
szFrame, szBlock, numBlocks, err := afpacketComputeSize(8, 65535, os.Getpagesize())
|
||||||
if result.handle, err = pcap.OpenOffline(filename); err != nil {
|
|
||||||
return result, fmt.Errorf("PCAP OpenOffline error: %v", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// This is a little complicated because we want to allow all possible options
|
|
||||||
// for creating the packet capture handle... instead of all this you can
|
|
||||||
// just call pcap.OpenLive if you want a simple handle.
|
|
||||||
inactive, err := pcap.NewInactiveHandle(interfaceName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, fmt.Errorf("could not create: %v", err)
|
panic(err)
|
||||||
}
|
|
||||||
defer inactive.CleanUp()
|
|
||||||
if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil {
|
|
||||||
return result, fmt.Errorf("could not set snap length: %v", err)
|
|
||||||
} else if err = inactive.SetPromisc(behaviour.Promisc); err != nil {
|
|
||||||
return result, fmt.Errorf("could not set promisc mode: %v", err)
|
|
||||||
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
|
||||||
return result, fmt.Errorf("could not set timeout: %v", err)
|
|
||||||
}
|
|
||||||
if behaviour.Tstype != "" {
|
|
||||||
if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil {
|
|
||||||
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
|
||||||
} else if err := inactive.SetTimestampSource(t); err != nil {
|
|
||||||
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if result.handle, err = inactive.Activate(); err != nil {
|
|
||||||
return result, fmt.Errorf("PCAP Activate error: %v", err)
|
|
||||||
}
|
}
|
||||||
|
result.Handle, err = newAfpacketHandle(interfaceName, szFrame, szBlock, numBlocks, false, pcap.BlockForever)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
if behaviour.BpfFilter != "" {
|
if behaviour.BpfFilter != "" {
|
||||||
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
|
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
|
||||||
if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil {
|
if err = result.setBPFFilter(behaviour.BpfFilter); err != nil {
|
||||||
return nil, fmt.Errorf("BPF filter error: %v", err)
|
return nil, fmt.Errorf("BPF filter error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var dec gopacket.Decoder
|
result.source = gopacket.ZeroCopyPacketDataSource(result.Handle)
|
||||||
var ok bool
|
|
||||||
if behaviour.DecoderName == "" {
|
|
||||||
behaviour.DecoderName = result.handle.LinkType().String()
|
|
||||||
}
|
|
||||||
if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok {
|
|
||||||
return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName)
|
|
||||||
}
|
|
||||||
result.source = gopacket.NewPacketSource(result.handle, dec)
|
|
||||||
result.source.Lazy = behaviour.Lazy
|
|
||||||
result.source.NoCopy = true
|
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -106,12 +131,26 @@ func (source *tcpPacketSource) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
|
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
|
||||||
return source.handle.SetBPFFilter(expr)
|
pcapBPF, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, 65535, expr)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
bpfIns := []bpf.RawInstruction{}
|
||||||
|
for _, ins := range pcapBPF {
|
||||||
|
bpfIns2 := bpf.RawInstruction{
|
||||||
|
Op: ins.Code,
|
||||||
|
Jt: ins.Jt,
|
||||||
|
Jf: ins.Jf,
|
||||||
|
K: ins.K,
|
||||||
|
}
|
||||||
|
bpfIns = append(bpfIns, bpfIns2)
|
||||||
|
}
|
||||||
|
return source.Handle.SetBPF(bpfIns)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (source *tcpPacketSource) close() {
|
func (source *tcpPacketSource) close() {
|
||||||
if source.handle != nil {
|
if source.Handle != nil {
|
||||||
source.handle.Close()
|
source.Handle.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,7 +158,15 @@ func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- TcpPack
|
|||||||
logger.Log.Infof("Start reading packets from %v", source.name)
|
logger.Log.Infof("Start reading packets from %v", source.name)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
packet, err := source.source.NextPacket()
|
data, ci, err := source.source.ZeroCopyReadPacketData()
|
||||||
|
|
||||||
|
decoder := gopacket.DecodersByLayerName[fmt.Sprintf("%s", layers.LinkTypeEthernet)]
|
||||||
|
decodeOptions := gopacket.DecodeOptions{Lazy: false, NoCopy: true}
|
||||||
|
|
||||||
|
packet := gopacket.NewPacket(data, decoder, decodeOptions)
|
||||||
|
m := packet.Metadata()
|
||||||
|
m.CaptureInfo = ci
|
||||||
|
m.Truncated = m.Truncated || ci.CaptureLength < ci.Length
|
||||||
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
logger.Log.Infof("Got EOF while reading packets from %v", source.name)
|
logger.Log.Infof("Got EOF while reading packets from %v", source.name)
|
||||||
|
Loading…
Reference in New Issue
Block a user