mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-05-01 05:15:49 +00:00
* close gopacket conn immediately * increase last ack threshold to 3 seconds * remove empty condition * add periodic stats * protect connections from concurrent updates * implement basic throttling base on live streams count * remove assembler mutex * pr fixes * change max conns default to 500 * create connectionId type * fix linter
179 lines
4.4 KiB
Go
179 lines
4.4 KiB
Go
package source
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
v1 "k8s.io/api/core/v1"
|
|
)
|
|
|
|
const bpfFilterMaxPods = 150
|
|
const hostSourcePid = "0"
|
|
|
|
type PacketSourceManagerConfig struct {
|
|
mtls bool
|
|
procfs string
|
|
interfaceName string
|
|
behaviour TcpPacketSourceBehaviour
|
|
}
|
|
|
|
type PacketSourceManager struct {
|
|
sources map[string]*tcpPacketSource
|
|
config PacketSourceManagerConfig
|
|
}
|
|
|
|
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
|
|
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
|
|
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sourceManager := &PacketSourceManager{
|
|
sources: map[string]*tcpPacketSource{
|
|
hostSourcePid: hostSource,
|
|
},
|
|
}
|
|
|
|
sourceManager.config = PacketSourceManagerConfig{
|
|
mtls: mtls,
|
|
procfs: procfs,
|
|
interfaceName: interfaceName,
|
|
behaviour: behaviour,
|
|
}
|
|
|
|
go hostSource.readPackets(ipdefrag, packets)
|
|
return sourceManager, nil
|
|
}
|
|
|
|
func newHostPacketSource(filename string, interfaceName string,
|
|
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
|
var name string
|
|
if filename == "" {
|
|
name = fmt.Sprintf("host-%s", interfaceName)
|
|
} else {
|
|
name = fmt.Sprintf("file-%s", filename)
|
|
}
|
|
|
|
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour, api.Pcap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return source, nil
|
|
}
|
|
|
|
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
|
if m.config.mtls {
|
|
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour, ipdefrag, packets)
|
|
}
|
|
|
|
m.setBPFFilter(pods)
|
|
}
|
|
|
|
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
|
interfaceName string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
|
|
|
relevantPids := m.getRelevantPids(procfs, pods)
|
|
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
|
|
|
|
for pid, src := range m.sources {
|
|
if _, ok := relevantPids[pid]; !ok {
|
|
src.close()
|
|
delete(m.sources, pid)
|
|
}
|
|
}
|
|
|
|
for pid, origin := range relevantPids {
|
|
if _, ok := m.sources[pid]; !ok {
|
|
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour, origin)
|
|
|
|
if err == nil {
|
|
go source.readPackets(ipdefrag, packets)
|
|
m.sources[pid] = source
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]api.Capture {
|
|
relevantPids := make(map[string]api.Capture)
|
|
relevantPids[hostSourcePid] = api.Pcap
|
|
|
|
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
|
|
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
|
|
} else {
|
|
for _, pid := range envoyPids {
|
|
relevantPids[pid] = api.Envoy
|
|
}
|
|
}
|
|
|
|
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
|
|
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
|
|
} else {
|
|
for _, pid := range linkerdPids {
|
|
relevantPids[pid] = api.Linkerd
|
|
}
|
|
}
|
|
|
|
return relevantPids
|
|
}
|
|
|
|
func buildBPFExpr(pods []v1.Pod) string {
|
|
hostsFilter := make([]string, 0)
|
|
|
|
for _, pod := range pods {
|
|
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
|
|
}
|
|
|
|
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
|
|
}
|
|
|
|
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
|
|
if len(pods) == 0 {
|
|
logger.Log.Info("No pods provided, skipping pcap bpf filter")
|
|
return
|
|
}
|
|
|
|
var expr string
|
|
|
|
if len(pods) > bpfFilterMaxPods {
|
|
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
|
|
expr = "port not 443"
|
|
} else {
|
|
expr = buildBPFExpr(pods)
|
|
}
|
|
|
|
logger.Log.Infof("Setting pcap bpf filter %s", expr)
|
|
|
|
for pid, src := range m.sources {
|
|
if err := src.setBPFFilter(expr); err != nil {
|
|
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *PacketSourceManager) Close() {
|
|
for _, src := range m.sources {
|
|
src.close()
|
|
}
|
|
}
|
|
|
|
func (m *PacketSourceManager) Stats() string {
|
|
result := ""
|
|
|
|
for _, source := range m.sources {
|
|
stats, err := source.Stats()
|
|
|
|
if err != nil {
|
|
result = result + fmt.Sprintf("[%s: err:%s]", source.String(), err)
|
|
} else {
|
|
result = result + fmt.Sprintf("[%s: rec: %d dropped: %d]", source.String(), stats.PacketsReceived, stats.PacketsDropped)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|