mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-01 16:57:28 +00:00
Add support to auto discover envoy processes (#459)
* discover envoy pids using cluster ips * add istio flag to cli + rename mtls flag to istio * add istio.md to docs * Fixing typos * Fix minor typos and grammer in docs Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
This commit is contained in:
parent
b77ea63f42
commit
6caa94f08f
@ -94,17 +94,17 @@ func main() {
|
||||
panic("API server address must be provided with --api-server-address when using --tap")
|
||||
}
|
||||
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tapTargets := getTapTargets()
|
||||
if tapTargets != nil {
|
||||
tap.SetFilterAuthorities(tapTargets)
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
|
||||
tapOpts.FilterAuthorities = tapTargets
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
filteringOptions := getTrafficFilteringOptions()
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
|
||||
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
|
||||
if err != nil {
|
||||
@ -443,6 +443,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e
|
||||
IgnoredUserAgents: config.Config.IgnoredUserAgents,
|
||||
MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions,
|
||||
MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway
|
||||
Istio: config.Config.Istio,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
@ -120,4 +120,5 @@ func init() {
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file path with policy rules")
|
||||
tapCmd.Flags().String(configStructs.ContractFile, defaultTapConfig.ContractFile, "OAS/Swagger file to validate to monitor the contracts")
|
||||
tapCmd.Flags().Bool(configStructs.DaemonModeTapName, defaultTapConfig.DaemonMode, "Run mizu in daemon mode, detached from the cli")
|
||||
tapCmd.Flags().Bool(configStructs.IstioName, defaultTapConfig.Istio, "Record decrypted traffic if the cluster configured with istio and mtls")
|
||||
}
|
||||
|
@ -214,6 +214,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
|
||||
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
||||
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
||||
MizuServiceAccountExists: state.mizuServiceAccountExists,
|
||||
Istio: config.Config.Tap.Istio,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
@ -402,6 +402,7 @@ func getMizuAgentConfig(targetNamespaces []string, mizuApiFilteringOptions *api.
|
||||
MizuResourcesNamespace: Config.MizuResourcesNamespace,
|
||||
MizuApiFilteringOptions: *mizuApiFilteringOptions,
|
||||
AgentDatabasePath: shared.DataDirPath,
|
||||
Istio: Config.Tap.Istio,
|
||||
}
|
||||
return &config, nil
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ const (
|
||||
EnforcePolicyFile = "traffic-validation-file"
|
||||
ContractFile = "contract"
|
||||
DaemonModeTapName = "daemon"
|
||||
IstioName = "istio"
|
||||
)
|
||||
|
||||
type TapConfig struct {
|
||||
@ -48,6 +49,7 @@ type TapConfig struct {
|
||||
TapperResources shared.Resources `yaml:"tapper-resources"`
|
||||
DaemonMode bool `yaml:"daemon" default:"false"`
|
||||
NoPersistentVolumeClaim bool `yaml:"no-persistent-volume-claim" default:"false"`
|
||||
Istio bool `yaml:"istio" default:"false"`
|
||||
}
|
||||
|
||||
func (config *TapConfig) PodRegex() *regexp.Regexp {
|
||||
|
46
docs/ISTIO.md
Normal file
46
docs/ISTIO.md
Normal file
@ -0,0 +1,46 @@
|
||||

|
||||
# Istio mutual tls (mtls) with Mizu
|
||||
This document describe how Mizu tapper handles workloads configured with mtls, making the internal traffic between services in a cluster to be encrypted.
|
||||
|
||||
Besides Istio there are other service meshes that implement mtls. However, as of now Istio is the most used one, and this is why we are focusing on it.
|
||||
|
||||
In order to create an Istio setup for development, follow those steps:
|
||||
1. Deploy a sample application to a Kubernetes cluster, the sample application needs to make internal service to service calls
|
||||
2. SSH to one of the nodes, and run `tcpdump`
|
||||
3. Make sure you see the internal service to service calls in a plain text
|
||||
4. Deploy Istio to the cluster - make sure it is attached to all pods of the sample application, and that it is configured with mtls (default)
|
||||
5. Run `tcpdump` again, make sure you don't see the internal service to service calls in a plain text
|
||||
|
||||
## The connection between Istio and Envoy
|
||||
In order to implement its service mesh capabilities, [Istio](https://istio.io) use an [Envoy](https://www.envoyproxy.io) sidecar in front of every pod in the cluster. The Envoy is responsible for the mtls communication, and that's why we are focusing on Envoy proxy.
|
||||
|
||||
In the future we might see more players in that field, then we'll have to either add support for each of them or go with a unified eBPF solution.
|
||||
|
||||
## Network namespaces
|
||||
A [linux network namespace](https://man7.org/linux/man-pages/man7/network_namespaces.7.html) is an isolation that limit the process view of the network. In the container world it used to isolate one container from another. In the Kubernetes world it used to isolate a pod from another. That means that two containers running on the same pod share the same network namespace. A container can reach a container in the same pod by accessing `localhost`.
|
||||
|
||||
An Envoy proxy configured with mtls receives the inbound traffic directed to the pod, decrypts it and sends it via `localhost` to the target container.
|
||||
|
||||
## Tapping mtls traffic
|
||||
In order for Mizu to be able to see the decrypted traffic it needs to listen on the same network namespace of the target pod. Multiple threads of the same process can have different network namespaces.
|
||||
|
||||
[gopacket](https://github.com/google/gopacket) uses [libpacp](https://github.com/the-tcpdump-group/libpcap) by default for capturing the traffic. Libpacap doesn't support network namespaces and we can't ask it to listen to traffic on a different namespace. However, we can change the network namespace of the calling thread and then start libpcap to see the traffic on a different namespace.
|
||||
|
||||
## Finding the network namespace of a running process
|
||||
The network namespace of a running process can be found in `/proc/PID/ns/net` link. Once we have this link, we can ask Linux to change the network namespace of a thread to this one.
|
||||
|
||||
This mean that Mizu needs to have access to the `/proc` (procfs) of the running node.
|
||||
|
||||
## Finding the network namespace of a running pod
|
||||
In order for Mizu to be able to listen to mtls traffic, it needs to get the PIDs of the the running pods, filter them according to the user filters and then start listen to their internal network namespace traffic.
|
||||
|
||||
There is no official way in Kubernetes to get from pod to PID. The CRI implementation purposefully doesn't force a pod to be a processes on the host. It can be a Virtual Machine as well like [Kata containers](https://katacontainers.io)
|
||||
|
||||
While we can provide a solution for various CRIs (like Docker, Containerd and CRI-O) it's better to have a unified solution. In order to achieve that, Mizu scans all the processes in the host, and finds the Envoy processes using their `/proc/PID/exe` link.
|
||||
|
||||
Once Mizu detects an Envoy process, it need to check whether this specific Envoy process is relevant according the user filters. The user filters are a list of `CLUSTER_IPS`. The tapper gets them via the `TapOpts.FilterAuthorities` list.
|
||||
|
||||
Istio sends an `INSTANCE_IP` environment variable to every Envoy proxy process. By examining the Envoy process's environment variables we can see whether it's relevant or not. Examining a process environment variables is done by reading the `/proc/PID/envion` file.
|
||||
|
||||
## Edge cases
|
||||
The method we use to find Envoy processes and correlate them to the cluster IPs may be inaccurate in certain situations. If, for example, a user runs an Envoy process manually, and set its `INSTANCE_IP` environment variable to one of the `CLUSTER_IPS` the tapper gets, then Mizu will capture traffic for it.
|
@ -43,6 +43,7 @@ type TapperSyncerConfig struct {
|
||||
IgnoredUserAgents []string
|
||||
MizuApiFilteringOptions api.TrafficFilteringOptions
|
||||
MizuServiceAccountExists bool
|
||||
Istio bool
|
||||
}
|
||||
|
||||
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) {
|
||||
@ -222,6 +223,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.ImagePullPolicy,
|
||||
tapperSyncer.config.MizuApiFilteringOptions,
|
||||
tapperSyncer.config.LogLevel,
|
||||
tapperSyncer.config.Istio,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -612,7 +612,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level) error {
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, istio bool) error {
|
||||
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
|
||||
|
||||
if len(nodeToTappedPodIPMap) == 0 {
|
||||
@ -635,7 +635,10 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
"--tap",
|
||||
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
|
||||
"--nodefrag",
|
||||
"--procfs", procfsMountPath,
|
||||
}
|
||||
|
||||
if istio {
|
||||
mizuCmd = append(mizuCmd, "--procfs", procfsMountPath, "--istio")
|
||||
}
|
||||
|
||||
agentContainer := applyconfcore.Container()
|
||||
|
@ -43,6 +43,7 @@ type MizuAgentConfig struct {
|
||||
MizuResourcesNamespace string `json:"mizuResourceNamespace"`
|
||||
MizuApiFilteringOptions api.TrafficFilteringOptions `json:"mizuApiFilteringOptions"`
|
||||
AgentDatabasePath string `json:"agentDatabasePath"`
|
||||
Istio bool `json:"istio"`
|
||||
}
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
|
@ -50,14 +50,15 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
||||
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
||||
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
||||
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
|
||||
var istio = flag.Bool("istio", false, "Record decrypted traffic if the cluster configured with istio and mtls")
|
||||
|
||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||
|
||||
type TapOpts struct {
|
||||
HostMode bool
|
||||
HostMode bool
|
||||
FilterAuthorities []string
|
||||
}
|
||||
|
||||
var hostMode bool // global
|
||||
var extensions []*api.Extension // global
|
||||
var filteringOptions *api.TrafficFilteringOptions // global
|
||||
|
||||
@ -80,15 +81,18 @@ func inArrayString(arr []string, valueToCheck string) bool {
|
||||
}
|
||||
|
||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
||||
hostMode = opts.HostMode
|
||||
extensions = extensionsRef
|
||||
filteringOptions = options
|
||||
|
||||
if opts.FilterAuthorities == nil {
|
||||
opts.FilterAuthorities = []string{}
|
||||
}
|
||||
|
||||
if GetMemoryProfilingEnabled() {
|
||||
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
||||
}
|
||||
|
||||
go startPassiveTapper(outputItems)
|
||||
go startPassiveTapper(opts, outputItems)
|
||||
}
|
||||
|
||||
func printPeriodicStats(cleaner *Cleaner) {
|
||||
@ -131,7 +135,7 @@ func printPeriodicStats(cleaner *Cleaner) {
|
||||
}
|
||||
}
|
||||
|
||||
func initializePacketSources() (*source.PacketSourceManager, error) {
|
||||
func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) {
|
||||
var bpffilter string
|
||||
if len(flag.Args()) > 0 {
|
||||
bpffilter = strings.Join(flag.Args(), " ")
|
||||
@ -146,17 +150,17 @@ func initializePacketSources() (*source.PacketSourceManager, error) {
|
||||
BpfFilter: bpffilter,
|
||||
}
|
||||
|
||||
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, behaviour)
|
||||
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, opts.FilterAuthorities, behaviour)
|
||||
}
|
||||
|
||||
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
|
||||
streamsMap := NewTcpStreamMap()
|
||||
go streamsMap.closeTimedoutTcpStreamChannels()
|
||||
|
||||
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
||||
diagnose.InitializeTapperInternalStats()
|
||||
|
||||
sources, err := initializePacketSources()
|
||||
sources, err := initializePacketSources(opts)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Fatal(err)
|
||||
@ -169,7 +173,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
}
|
||||
|
||||
packets := make(chan source.TcpPacketInfo)
|
||||
assembler := NewTcpAssembler(outputItems, streamsMap)
|
||||
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
||||
|
||||
diagnose.AppStats.SetStartTime(time.Now())
|
||||
|
||||
|
@ -18,24 +18,6 @@ const (
|
||||
TcpStreamChannelTimeoutMsDefaultValue = 10000
|
||||
)
|
||||
|
||||
type globalSettings struct {
|
||||
filterAuthorities []string
|
||||
}
|
||||
|
||||
var gSettings = &globalSettings{
|
||||
filterAuthorities: []string{},
|
||||
}
|
||||
|
||||
func SetFilterAuthorities(ipAddresses []string) {
|
||||
gSettings.filterAuthorities = ipAddresses
|
||||
}
|
||||
|
||||
func GetFilterIPs() []string {
|
||||
addresses := make([]string, len(gSettings.filterAuthorities))
|
||||
copy(addresses, gSettings.filterAuthorities)
|
||||
return addresses
|
||||
}
|
||||
|
||||
func GetMaxBufferedPagesTotal() int {
|
||||
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxBufferedPagesTotalEnvVarName))
|
||||
if err != nil {
|
||||
|
112
tap/source/envoy_discoverer.go
Normal file
112
tap/source/envoy_discoverer.go
Normal file
@ -0,0 +1,112 @@
|
||||
package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
const envoyBinary = "/envoy"
|
||||
|
||||
var numberRegex = regexp.MustCompile("[0-9]+")
|
||||
|
||||
func discoverRelevantEnvoyPids(procfs string, clusterIps []string) ([]string, error) {
|
||||
result := make([]string, 0)
|
||||
|
||||
pids, err := ioutil.ReadDir(procfs)
|
||||
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
logger.Log.Infof("Starting envoy auto discoverer %v %v - scanning %v potential pids",
|
||||
procfs, clusterIps, len(pids))
|
||||
|
||||
for _, pid := range pids {
|
||||
if !pid.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
if !numberRegex.MatchString(pid.Name()) {
|
||||
continue
|
||||
}
|
||||
|
||||
if checkPid(procfs, pid.Name(), clusterIps) {
|
||||
result = append(result, pid.Name())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Infof("Found %v relevant envoy processes - %v", len(result), result)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func checkPid(procfs string, pid string, clusterIps []string) bool {
|
||||
execLink := fmt.Sprintf("%v/%v/exe", procfs, pid)
|
||||
exec, err := os.Readlink(execLink)
|
||||
|
||||
if err != nil {
|
||||
// Debug on purpose - it may happen due to many reasons and we only care
|
||||
// for it during troubleshooting
|
||||
//
|
||||
logger.Log.Debugf("Unable to read link %v - %v\n", execLink, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(exec, envoyBinary) {
|
||||
return false
|
||||
}
|
||||
|
||||
environmentFile := fmt.Sprintf("%v/%v/environ", procfs, pid)
|
||||
clusterIp, err := readEnvironmentVariable(environmentFile, "INSTANCE_IP")
|
||||
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if clusterIp == "" {
|
||||
logger.Log.Debugf("Found an envoy process without INSTANCE_IP variable %v\n", pid)
|
||||
return false
|
||||
}
|
||||
|
||||
logger.Log.Infof("Found envoy pid %v with cluster ip %v", pid, clusterIp)
|
||||
|
||||
for _, value := range clusterIps {
|
||||
if value == clusterIp {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func readEnvironmentVariable(file string, name string) (string, error) {
|
||||
bytes, err := ioutil.ReadFile(file)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Error reading environment file %v - %v", file, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
envs := strings.Split(string(bytes), string([]byte{0}))
|
||||
|
||||
for _, env := range envs {
|
||||
if !strings.Contains(env, "=") {
|
||||
continue
|
||||
}
|
||||
|
||||
parts := strings.Split(env, "=")
|
||||
varName := parts[0]
|
||||
value := parts[1]
|
||||
|
||||
if name == varName {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
@ -15,26 +15,63 @@ type PacketSourceManager struct {
|
||||
}
|
||||
|
||||
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
istio bool, clusterIps []string, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
sources := make([]*tcpPacketSource, 0)
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sources = append(sources, hostSource)
|
||||
|
||||
if pids != "" {
|
||||
netnsSources := newNetnsPacketSources(procfs, pids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
}
|
||||
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
|
||||
sources = createSourcesFromEnvoy(sources, istio, procfs, clusterIps, interfaceName, behaviour)
|
||||
|
||||
return &PacketSourceManager{
|
||||
sources: sources,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return sources, err
|
||||
}
|
||||
|
||||
return append(sources, hostSource), nil
|
||||
}
|
||||
|
||||
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if pids == "" {
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
return sources
|
||||
}
|
||||
|
||||
func createSourcesFromEnvoy(sources []*tcpPacketSource, istio bool, procfs string, clusterIps []string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if !istio {
|
||||
return sources
|
||||
}
|
||||
|
||||
envoyPids, err := discoverRelevantEnvoyPids(procfs, clusterIps)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
|
||||
return sources
|
||||
}
|
||||
|
||||
func newHostPacketSource(filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
var name string
|
||||
@ -54,11 +91,11 @@ func newHostPacketSource(filename string, interfaceName string,
|
||||
return source, nil
|
||||
}
|
||||
|
||||
func newNetnsPacketSources(procfs string, pids string, interfaceName string,
|
||||
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
result := make([]*tcpPacketSource, 0)
|
||||
|
||||
for _, pidstr := range strings.Split(pids, ",") {
|
||||
for _, pidstr := range pids {
|
||||
pid, err := strconv.Atoi(pidstr)
|
||||
|
||||
if err != nil {
|
||||
@ -100,9 +137,9 @@ func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
|
||||
//
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
|
||||
oldnetns, err := netns.Get()
|
||||
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of current thread %v", err)
|
||||
errors <- err
|
||||
|
@ -33,13 +33,13 @@ func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
|
||||
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
|
||||
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap, opts *TapOpts) *tcpAssembler {
|
||||
var emitter api.Emitter = &api.Emitting{
|
||||
AppStats: &diagnose.AppStats,
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
|
||||
streamFactory := NewTcpStreamFactory(emitter, streamsMap, opts)
|
||||
streamPool := reassembly.NewStreamPool(streamFactory)
|
||||
assembler := reassembly.NewAssembler(streamPool)
|
||||
|
||||
|
@ -24,6 +24,7 @@ type tcpStreamFactory struct {
|
||||
Emitter api.Emitter
|
||||
streamsMap *tcpStreamMap
|
||||
ownIps []string
|
||||
opts *TapOpts
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
@ -31,7 +32,7 @@ type tcpStreamWrapper struct {
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory {
|
||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
|
||||
var ownIps []string
|
||||
|
||||
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
||||
@ -47,6 +48,7 @@ func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStre
|
||||
Emitter: emitter,
|
||||
streamsMap: streamsMap,
|
||||
ownIps: ownIps,
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,17 +141,17 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
|
||||
}
|
||||
|
||||
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
||||
if hostMode {
|
||||
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
||||
if factory.opts.HostMode {
|
||||
if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, dstIP) {
|
||||
} else if inArrayString(factory.opts.FilterAuthorities, dstIP) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
||||
} else if inArrayString(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
} else if inArrayString(gSettings.filterAuthorities, srcIP) {
|
||||
} else if inArrayString(factory.opts.FilterAuthorities, srcIP) {
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user