kubeshark/tap/source/packet_source_manager.go
David Levanon 99cb0b4f44
Close gopacket connection immediately, basic throttling and assembler mutex removal
* close gopacket conn immediately

* increase last ack threshold to 3 seconds

* remove empty condition

* add periodic stats

* protect connections from concurrent updates

* implement basic throttling base on live streams count

* remove assembler mutex

* pr fixes

* change max conns default to 500

* create connectionId type

* fix linter
2022-06-19 16:47:03 +03:00

179 lines
4.4 KiB
Go

package source
import (
"fmt"
"strings"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
v1 "k8s.io/api/core/v1"
)
const bpfFilterMaxPods = 150
const hostSourcePid = "0"
type PacketSourceManagerConfig struct {
mtls bool
procfs string
interfaceName string
behaviour TcpPacketSourceBehaviour
}
type PacketSourceManager struct {
sources map[string]*tcpPacketSource
config PacketSourceManagerConfig
}
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
sourceManager := &PacketSourceManager{
sources: map[string]*tcpPacketSource{
hostSourcePid: hostSource,
},
}
sourceManager.config = PacketSourceManagerConfig{
mtls: mtls,
procfs: procfs,
interfaceName: interfaceName,
behaviour: behaviour,
}
go hostSource.readPackets(ipdefrag, packets)
return sourceManager, nil
}
func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string
if filename == "" {
name = fmt.Sprintf("host-%s", interfaceName)
} else {
name = fmt.Sprintf("file-%s", filename)
}
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour, api.Pcap)
if err != nil {
return nil, err
}
return source, nil
}
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
if m.config.mtls {
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour, ipdefrag, packets)
}
m.setBPFFilter(pods)
}
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
relevantPids := m.getRelevantPids(procfs, pods)
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
for pid, src := range m.sources {
if _, ok := relevantPids[pid]; !ok {
src.close()
delete(m.sources, pid)
}
}
for pid, origin := range relevantPids {
if _, ok := m.sources[pid]; !ok {
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour, origin)
if err == nil {
go source.readPackets(ipdefrag, packets)
m.sources[pid] = source
}
}
}
}
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]api.Capture {
relevantPids := make(map[string]api.Capture)
relevantPids[hostSourcePid] = api.Pcap
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
} else {
for _, pid := range envoyPids {
relevantPids[pid] = api.Envoy
}
}
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
} else {
for _, pid := range linkerdPids {
relevantPids[pid] = api.Linkerd
}
}
return relevantPids
}
func buildBPFExpr(pods []v1.Pod) string {
hostsFilter := make([]string, 0)
for _, pod := range pods {
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
}
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
}
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
if len(pods) == 0 {
logger.Log.Info("No pods provided, skipping pcap bpf filter")
return
}
var expr string
if len(pods) > bpfFilterMaxPods {
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
expr = "port not 443"
} else {
expr = buildBPFExpr(pods)
}
logger.Log.Infof("Setting pcap bpf filter %s", expr)
for pid, src := range m.sources {
if err := src.setBPFFilter(expr); err != nil {
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
}
}
}
func (m *PacketSourceManager) Close() {
for _, src := range m.sources {
src.close()
}
}
func (m *PacketSourceManager) Stats() string {
result := ""
for _, source := range m.sources {
stats, err := source.Stats()
if err != nil {
result = result + fmt.Sprintf("[%s: err:%s]", source.String(), err)
} else {
result = result + fmt.Sprintf("[%s: rec: %d dropped: %d]", source.String(), stats.PacketsReceived, stats.PacketsDropped)
}
}
return result
}