mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-13 06:08:15 +00:00
213 lines
5.3 KiB
Go
213 lines
5.3 KiB
Go
package source
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
"github.com/vishvananda/netns"
|
|
v1 "k8s.io/api/core/v1"
|
|
)
|
|
|
|
type PacketSourceManager struct {
|
|
sources []*tcpPacketSource
|
|
}
|
|
|
|
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
|
|
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
|
sources := make([]*tcpPacketSource, 0)
|
|
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
|
|
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour)
|
|
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour)
|
|
|
|
return &PacketSourceManager{
|
|
sources: sources,
|
|
}, nil
|
|
}
|
|
|
|
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
|
|
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
|
|
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
|
|
|
if err != nil {
|
|
return sources, err
|
|
}
|
|
|
|
return append(sources, hostSource), nil
|
|
}
|
|
|
|
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
|
|
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
|
if pids == "" {
|
|
return sources
|
|
}
|
|
|
|
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
|
|
sources = append(sources, netnsSources...)
|
|
return sources
|
|
}
|
|
|
|
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
|
|
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
|
if !mtls {
|
|
return sources
|
|
}
|
|
|
|
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
|
|
|
|
if err != nil {
|
|
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
|
|
return sources
|
|
}
|
|
|
|
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
|
|
sources = append(sources, netnsSources...)
|
|
|
|
return sources
|
|
}
|
|
|
|
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
|
|
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
|
if !mtls {
|
|
return sources
|
|
}
|
|
|
|
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
|
|
|
|
if err != nil {
|
|
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
|
|
return sources
|
|
}
|
|
|
|
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
|
|
sources = append(sources, netnsSources...)
|
|
|
|
return sources
|
|
}
|
|
|
|
func newHostPacketSource(filename string, interfaceName string,
|
|
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
|
var name string
|
|
|
|
if filename == "" {
|
|
name = fmt.Sprintf("host-%v", interfaceName)
|
|
} else {
|
|
name = fmt.Sprintf("file-%v", filename)
|
|
}
|
|
|
|
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return source, nil
|
|
}
|
|
|
|
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
|
|
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
|
result := make([]*tcpPacketSource, 0)
|
|
|
|
for _, pidstr := range pids {
|
|
pid, err := strconv.Atoi(pidstr)
|
|
|
|
if err != nil {
|
|
logger.Log.Errorf("Invalid PID: %v - %v", pid, err)
|
|
continue
|
|
}
|
|
|
|
nsh, err := netns.GetFromPath(fmt.Sprintf("%v/%v/ns/net", procfs, pid))
|
|
|
|
if err != nil {
|
|
logger.Log.Errorf("Unable to get netns of pid %v - %v", pid, err)
|
|
continue
|
|
}
|
|
|
|
src, err := newNetnsPacketSource(pid, nsh, interfaceName, behaviour)
|
|
|
|
if err != nil {
|
|
logger.Log.Errorf("Error starting netns packet source for %v - %v", pid, err)
|
|
continue
|
|
}
|
|
|
|
result = append(result, src)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
|
|
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
|
|
|
done := make(chan *tcpPacketSource)
|
|
errors := make(chan error)
|
|
|
|
go func(done chan<- *tcpPacketSource) {
|
|
// Setting a netns should be done from a dedicated OS thread.
|
|
//
|
|
// goroutines are not really OS threads, we try to mimic the issue by
|
|
// locking the OS thread to this goroutine
|
|
//
|
|
runtime.LockOSThread()
|
|
defer runtime.UnlockOSThread()
|
|
|
|
oldnetns, err := netns.Get()
|
|
|
|
if err != nil {
|
|
logger.Log.Errorf("Unable to get netns of current thread %v", err)
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
if err := netns.Set(nsh); err != nil {
|
|
logger.Log.Errorf("Unable to set netns of pid %v - %v", pid, err)
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
name := fmt.Sprintf("netns-%v-%v", pid, interfaceName)
|
|
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
|
|
|
|
if err != nil {
|
|
logger.Log.Errorf("Error listening to PID %v - %v", pid, err)
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
if err := netns.Set(oldnetns); err != nil {
|
|
logger.Log.Errorf("Unable to set back netns of current thread %v", err)
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
done <- src
|
|
}(done)
|
|
|
|
select {
|
|
case err := <-errors:
|
|
return nil, err
|
|
case source := <-done:
|
|
return source, nil
|
|
}
|
|
}
|
|
|
|
func (m *PacketSourceManager) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) {
|
|
for _, src := range m.sources {
|
|
go src.readPackets(ipdefrag, packets)
|
|
}
|
|
}
|
|
|
|
func (m *PacketSourceManager) Close() {
|
|
for _, src := range m.sources {
|
|
src.close()
|
|
}
|
|
}
|