Compare commits

...

4 Commits

Author SHA1 Message Date
M. Mert Yıldıran
d844d6eb04 Upgrade to Basenine v0.6.3, add xml and time helpers, make xml and json helpers available in redact helper (#891)
* Upgrade to Basenine `v0.6.2`, add `xml` helper, make `xml` and `json` helpers available in `redact` helper

* URL encode the query

* Upgrade to Basenine `v0.6.3`
2022-03-15 12:21:25 +03:00
David Levanon
6979441422 tls missing addresses (#825)
* stream seen file descriptors from ebpf

* re-generate bpf object files

* fixing pr comments
2022-03-14 15:40:27 +02:00
David Levanon
9ec8347c6c set bpf filter for pcap (#865)
* set bpf filter for pcap

* implement pod updating mechanism

* Update tap/source/netns_packet_source.go

* Update tap/source/netns_packet_source.go

* minor pr fixes

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
2022-03-14 15:35:49 +02:00
Igor Gov
617fb89ca5 Build custom branch Github action (#890)
* Build custom branch github action #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* .
2022-03-14 13:15:28 +02:00
18 changed files with 456 additions and 255 deletions

View File

@@ -0,0 +1,44 @@
name: Build Custom Branch
on: push
concurrency:
group: custom-branch-build-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
name: Push custom branch image to GCR
runs-on: ubuntu-latest
if: ${{ contains(github.event.head_commit.message, '#build_and_publish_custom_image') }}
steps:
- name: Check out the repo
uses: actions/checkout@v2
- id: 'auth'
uses: 'google-github-actions/auth@v0'
with:
credentials_json: '${{ secrets.GCR_JSON_KEY }}'
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
- name: Get base image name
shell: bash
run: echo "##[set-output name=image;]$(echo gcr.io/up9-docker-hub/mizu/${GITHUB_REF#refs/heads/})"
id: base_image_step
- name: Login to GCR
uses: docker/login-action@v1
with:
registry: gcr.io
username: _json_key
password: ${{ secrets.GCR_JSON_KEY }}
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
push: true
tags: ${{ steps.base_image_step.outputs.image }}:latest

View File

@@ -78,8 +78,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent . -X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum # Download Basenine executable, verify the sha1sum
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH} ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256 ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256 RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
RUN chmod +x ./basenine_linux_${GOARCH} RUN chmod +x ./basenine_linux_${GOARCH}
RUN mv ./basenine_linux_${GOARCH} ./basenine RUN mv ./basenine_linux_${GOARCH} ./basenine

View File

@@ -22,7 +22,7 @@ require (
github.com/ory/kratos-client-go v0.8.2-alpha.1 github.com/ory/kratos-client-go v0.8.2-alpha.1
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0 github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0 github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -853,14 +853,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ= github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1 h1:0XN8s3HtwUBr9hbWRAFulFMsu1f2cabfJbwpz/sOoLA= github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e h1:/9dFXqvRDHcwPQdIGHP6iz6M0iAWBPOxYf6C+Ntq5w0=
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI= github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e h1:nv/A/AeF8PcU91aHAj6o2cU8fl/46v0ZLj7wgIKjv+o=
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500 h1:T1QHxt65NMete/GobVSvcHnwZAQibvahhrMTCgtnSS4=
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0 h1:mSqZuJJV4UZyaAoC8x7/AO7DLidlXepFyU18Vm3rFiA=
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -11,7 +11,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.3.0 github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0 github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
@@ -35,6 +35,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alecthomas/participle/v2 v2.0.0-alpha7 // indirect github.com/alecthomas/participle/v2 v2.0.0-alpha7 // indirect
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
github.com/clbanning/mxj/v2 v2.5.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect

View File

@@ -120,6 +120,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/mxj/v2 v2.5.5 h1:oT81vUeEiQQ/DcHbzSytRngP6Ky9O+L+0Bw0zSJag9E=
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@@ -598,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0 h1:9PQamOq285DyVsRlS4KB/x2+xkr5QlpiT9Y/BPutS4A= github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e h1:reG/QwyxdfvGObfdrae7DZc3rTMiGwQ6S/4PRkwtBoE=
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:R9bG4y/iq89jNC0xZ25uKDqenyKFTR3X9acGDOkKWSE= github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk= github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=

View File

@@ -7,6 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"os" "os"
"sort" "sort"
@@ -18,6 +19,9 @@ import (
const mizuTestEnvVar = "MIZU_TEST" const mizuTestEnvVar = "MIZU_TEST"
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
var UnknownPort uint16 = 0
type Protocol struct { type Protocol struct {
Name string `json:"name"` Name string `json:"name"`
LongName string `json:"longName"` LongName string `json:"longName"`

View File

@@ -52,7 +52,6 @@ var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use") var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode") var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data") var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls") var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
var tls = flag.Bool("tls", false, "Enable TLS tapper") var tls = flag.Bool("tls", false, "Enable TLS tapper")
@@ -190,7 +189,7 @@ func initializePacketSources() error {
} }
var err error var err error
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil { if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
return err return err
} else { } else {
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan) packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
@@ -248,7 +247,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
tls := tlstapper.TlsTapper{} tls := tlstapper.TlsTapper{}
tlsPerfBufferSize := os.Getpagesize() * 100 tlsPerfBufferSize := os.Getpagesize() * 100
if err := tls.Init(tlsPerfBufferSize); err != nil { if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
tlstapper.LogError(err) tlstapper.LogError(err)
return return
} }
@@ -272,6 +271,5 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
OutputChannel: outputItems, OutputChannel: outputItems,
} }
poller := tlstapper.NewTlsPoller(&tls, extension) go tls.Poll(emitter, options)
go poller.Poll(extension, emitter, options)
} }

View File

@@ -0,0 +1,83 @@
package source
import (
"fmt"
"runtime"
"github.com/up9inc/mizu/shared/logger"
"github.com/vishvananda/netns"
)
func newNetnsPacketSource(procfs string, pid string,
interfaceName string, behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
if err != nil {
logger.Log.Errorf("Unable to get netns of pid %s - %w", pid, err)
return nil, err
}
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
return nil, err
}
return src, nil
}
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
done := make(chan *tcpPacketSource)
errors := make(chan error)
go func(done chan<- *tcpPacketSource) {
// Setting a netns should be done from a dedicated OS thread.
//
// goroutines are not really OS threads, we try to mimic the issue by
// locking the OS thread to this goroutine
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %w", err)
errors <- err
return
}
if err := netns.Set(nsh); err != nil {
logger.Log.Errorf("Unable to set netns of pid %s - %w", pid, err)
errors <- err
return
}
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
errors <- err
return
}
if err := netns.Set(oldnetns); err != nil {
logger.Log.Errorf("Unable to set back netns of current thread %w", err)
errors <- err
return
}
done <- src
}(done)
select {
case err := <-errors:
return nil, err
case source := <-done:
return source, nil
}
}

View File

@@ -2,109 +2,46 @@ package source
import ( import (
"fmt" "fmt"
"runtime"
"strconv"
"strings" "strings"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"github.com/vishvananda/netns"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
) )
const bpfFilterMaxPods = 150
const hostSourcePid = "0"
type PacketSourceManager struct { type PacketSourceManager struct {
sources []*tcpPacketSource sources map[string]*tcpPacketSource
} }
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string, func NewPacketSourceManager(procfs string, filename string, interfaceName string,
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) { mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
sources := make([]*tcpPacketSource, 0) hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour) sourceManager := &PacketSourceManager{
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour) sources: map[string]*tcpPacketSource{
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour) hostSourcePid: hostSource,
},
return &PacketSourceManager{
sources: sources,
}, nil
}
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return sources, err
} }
return append(sources, hostSource), nil sourceManager.UpdatePods(mtls, procfs, pods, interfaceName, behaviour)
} return sourceManager, nil
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if pids == "" {
return sources
}
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
} }
func newHostPacketSource(filename string, interfaceName string, func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) { behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string var name string
if filename == "" { if filename == "" {
name = fmt.Sprintf("host-%v", interfaceName) name = fmt.Sprintf("host-%s", interfaceName)
} else { } else {
name = fmt.Sprintf("file-%v", filename) name = fmt.Sprintf("file-%s", filename)
} }
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour) source, err := newTcpPacketSource(name, filename, interfaceName, behaviour)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -112,90 +49,93 @@ func newHostPacketSource(filename string, interfaceName string,
return source, nil return source, nil
} }
func newNetnsPacketSources(procfs string, pids []string, interfaceName string, func (m *PacketSourceManager) UpdatePods(mtls bool, procfs string, pods []v1.Pod,
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource { interfaceName string, behaviour TcpPacketSourceBehaviour) {
result := make([]*tcpPacketSource, 0) if mtls {
m.updateMtlsPods(procfs, pods, interfaceName, behaviour)
for _, pidstr := range pids {
pid, err := strconv.Atoi(pidstr)
if err != nil {
logger.Log.Errorf("Invalid PID: %v - %v", pid, err)
continue
}
nsh, err := netns.GetFromPath(fmt.Sprintf("%v/%v/ns/net", procfs, pid))
if err != nil {
logger.Log.Errorf("Unable to get netns of pid %v - %v", pid, err)
continue
}
src, err := newNetnsPacketSource(pid, nsh, interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error starting netns packet source for %v - %v", pid, err)
continue
}
result = append(result, src)
} }
return result m.setBPFFilter(pods)
} }
func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string, func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) { interfaceName string, behaviour TcpPacketSourceBehaviour) {
done := make(chan *tcpPacketSource) relevantPids := m.getRelevantPids(procfs, pods)
errors := make(chan error) logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
go func(done chan<- *tcpPacketSource) { for pid, src := range m.sources {
// Setting a netns should be done from a dedicated OS thread. if _, ok := relevantPids[pid]; !ok {
// src.close()
// goroutines are not really OS threads, we try to mimic the issue by delete(m.sources, pid)
// locking the OS thread to this goroutine
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %v", err)
errors <- err
return
} }
}
if err := netns.Set(nsh); err != nil { for pid := range relevantPids {
logger.Log.Errorf("Unable to set netns of pid %v - %v", pid, err) if _, ok := m.sources[pid]; !ok {
errors <- err source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
return
if err == nil {
m.sources[pid] = source
}
} }
}
}
name := fmt.Sprintf("netns-%v-%v", pid, interfaceName) func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]bool {
src, err := newTcpPacketSource(name, "", interfaceName, behaviour) relevantPids := make(map[string]bool)
relevantPids[hostSourcePid] = true
if err != nil { if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
logger.Log.Errorf("Error listening to PID %v - %v", pid, err) logger.Log.Warningf("Unable to discover envoy pids - %w", err)
errors <- err } else {
return for _, pid := range envoyPids {
relevantPids[pid] = true
} }
}
if err := netns.Set(oldnetns); err != nil { if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
logger.Log.Errorf("Unable to set back netns of current thread %v", err) logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
errors <- err } else {
return for _, pid := range linkerdPids {
relevantPids[pid] = true
} }
}
done <- src return relevantPids
}(done) }
select { func buildBPFExpr(pods []v1.Pod) string {
case err := <-errors: hostsFilter := make([]string, 0)
return nil, err
case source := <-done: for _, pod := range pods {
return source, nil hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
}
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
}
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
if len(pods) == 0 {
logger.Log.Info("No pods provided, skipping pcap bpf filter")
return
}
var expr string
if len(pods) > bpfFilterMaxPods {
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
expr = "port not 443"
} else {
expr = buildBPFExpr(pods)
}
logger.Log.Infof("Setting pcap bpf filter %s", expr)
for pid, src := range m.sources {
if err := src.setBPFFilter(expr); err != nil {
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
}
} }
} }

View File

@@ -98,6 +98,14 @@ func newTcpPacketSource(name, filename string, interfaceName string,
return result, nil return result, nil
} }
func (source *tcpPacketSource) String() string {
return source.name
}
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
return source.handle.SetBPFFilter(expr)
}
func (source *tcpPacketSource) close() { func (source *tcpPacketSource) close() {
if source.handle != nil { if source.handle != nil {
source.handle.Close() source.handle.Close()

View File

@@ -10,7 +10,7 @@ Copyright (C) UP9 Inc.
#define FLAGS_IS_CLIENT_BIT (1 << 0) #define FLAGS_IS_CLIENT_BIT (1 << 0)
#define FLAGS_IS_READ_BIT (1 << 1) #define FLAGS_IS_READ_BIT (1 << 1)
// The same struct can be found in Chunk.go // The same struct can be found in chunk.go
// //
// Be careful when editing, alignment and padding should be exactly the same in go/c. // Be careful when editing, alignment and padding should be exactly the same in go/c.
// //

View File

@@ -8,20 +8,20 @@ import (
"github.com/go-errors/errors" "github.com/go-errors/errors"
) )
const FLAGS_IS_CLIENT_BIT int32 = (1 << 0) const FLAGS_IS_CLIENT_BIT uint32 = (1 << 0)
const FLAGS_IS_READ_BIT int32 = (1 << 1) const FLAGS_IS_READ_BIT uint32 = (1 << 1)
// The same struct can be found in maps.h // The same struct can be found in maps.h
// //
// Be careful when editing, alignment and padding should be exactly the same in go/c. // Be careful when editing, alignment and padding should be exactly the same in go/c.
// //
type tlsChunk struct { type tlsChunk struct {
Pid int32 Pid uint32
Tgid int32 Tgid uint32
Len int32 Len uint32
Recorded int32 Recorded uint32
Fd int32 Fd uint32
Flags int32 Flags uint32
Address [16]byte Address [16]byte
Data [4096]byte Data [4096]byte
} }
@@ -68,3 +68,7 @@ func (c *tlsChunk) isWrite() bool {
func (c *tlsChunk) getRecordedData() []byte { func (c *tlsChunk) getRecordedData() []byte {
return c.Data[:c.Recorded] return c.Data[:c.Recorded]
} }
func (c *tlsChunk) isRequest() bool {
return (c.isClient() && c.isWrite()) || (c.isServer() && c.isRead())
}

View File

@@ -0,0 +1,102 @@
package tlstapper
import (
"fmt"
"io/ioutil"
"net"
"os"
"regexp"
"strconv"
"strings"
"github.com/go-errors/errors"
)
var socketInodeRegex = regexp.MustCompile(`socket:\[(\d+)\]`)
const (
SRC_ADDRESS_FILED_INDEX = 1
DST_ADDRESS_FILED_INDEX = 2
INODE_FILED_INDEX = 9
)
// This file helps to extract Ip and Port out of a Socket file descriptor.
//
// The equivalent bash commands are:
//
// > ls -l /proc/<pid>/fd/<fd>
// Output something like "socket:[1234]" for sockets - 1234 is the inode of the socket
// > cat /proc/<pid>/net/tcp | grep <inode>
// Output a line per ipv4 socket, the 9th field is the inode of the socket
// The 1st and 2nd fields are the source and dest ip and ports in a Hex format
// 0100007F:50 is 127.0.0.1:80
func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, uint16, error) {
inode, err := getSocketInode(procfs, pid, fd)
if err != nil {
return nil, 0, err
}
tcppath := fmt.Sprintf("%s/%d/net/tcp", procfs, pid)
tcp, err := ioutil.ReadFile(tcppath)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
for _, line := range strings.Split(string(tcp), "\n") {
parts := strings.Fields(line)
if len(parts) < 10 {
continue
}
if inode == parts[INODE_FILED_INDEX] {
if src {
return parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX])
} else {
return parseHexAddress(parts[DST_ADDRESS_FILED_INDEX])
}
}
}
return nil, 0, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode)
}
func getSocketInode(procfs string, pid uint32, fd uint32) (string, error) {
fdlinkPath := fmt.Sprintf("%s/%d/fd/%d", procfs, pid, fd)
fdlink, err := os.Readlink(fdlinkPath)
if err != nil {
return "", errors.Wrap(err, 0)
}
tokens := socketInodeRegex.FindStringSubmatch(fdlink)
if tokens == nil || len(tokens) < 1 {
return "", errors.Errorf("socket inode not found [pid: %d] [sockfd: %d] [link: %s]", pid, fd, fdlink)
}
return tokens[1], nil
}
// Format looks like 0100007F:50 for 127.0.0.1:80
//
func parseHexAddress(addr string) (net.IP, uint16, error) {
addrParts := strings.Split(addr, ":")
port, err := strconv.ParseUint(addrParts[1], 16, 16)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
ip, err := strconv.ParseUint(addrParts[0], 16, 32)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
return net.IP{uint8(ip), uint8(ip >> 8), uint8(ip >> 16), uint8(ip >> 24)}, uint16(port), nil
}

View File

@@ -2,7 +2,6 @@ package tlstapper
import ( import (
"debug/elf" "debug/elf"
"fmt"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
@@ -47,7 +46,7 @@ func findBaseAddress(sslElf *elf.File, sslLibraryPath string) (uint64, error) {
} }
} }
return 0, errors.New(fmt.Sprintf("Program header not found in %v", sslLibraryPath)) return 0, errors.Errorf("Program header not found in %v", sslLibraryPath)
} }
func findSslOffsets(sslElf *elf.File, base uint64) (sslOffsets, error) { func findSslOffsets(sslElf *elf.File, base uint64) (sslOffsets, error) {

View File

@@ -2,43 +2,64 @@ package tlstapper
import ( import (
"bufio" "bufio"
"bytes"
"fmt" "fmt"
"net" "net"
"encoding/binary"
"encoding/hex" "encoding/hex"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"github.com/cilium/ebpf/perf"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
const UNKNOWN_PORT uint16 = 80
const UNKNOWN_HOST string = "127.0.0.1"
type tlsPoller struct { type tlsPoller struct {
tls *TlsTapper tls *TlsTapper
readers map[string]*tlsReader readers map[string]*tlsReader
closedReaders chan string closedReaders chan string
reqResMatcher api.RequestResponseMatcher reqResMatcher api.RequestResponseMatcher
chunksReader *perf.Reader
extension *api.Extension
procfs string
} }
func NewTlsPoller(tls *TlsTapper, extension *api.Extension) *tlsPoller { func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
return &tlsPoller{ return &tlsPoller{
tls: tls, tls: tls,
readers: make(map[string]*tlsReader), readers: make(map[string]*tlsReader),
closedReaders: make(chan string, 100), closedReaders: make(chan string, 100),
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(), reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
extension: extension,
chunksReader: nil,
procfs: procfs,
} }
} }
func (p *tlsPoller) Poll(extension *api.Extension, func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
emitter api.Emitter, options *api.TrafficFilteringOptions) { var err error
p.chunksReader, err = perf.NewReader(bpfObjects.ChunksBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (p *tlsPoller) close() error {
return p.chunksReader.Close()
}
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
chunks := make(chan *tlsChunk) chunks := make(chan *tlsChunk)
go p.tls.pollPerf(chunks) go p.pollChunksPerfBuffer(chunks)
for { for {
select { select {
@@ -47,7 +68,7 @@ func (p *tlsPoller) Poll(extension *api.Extension,
return return
} }
if err := p.handleTlsChunk(chunk, extension, emitter, options); err != nil { if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil {
LogError(err) LogError(err)
} }
case key := <-p.closedReaders: case key := <-p.closedReaders:
@@ -56,6 +77,41 @@ func (p *tlsPoller) Poll(extension *api.Extension,
} }
} }
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
logger.Log.Infof("Start polling for tls events")
for {
record, err := p.chunksReader.Read()
if err != nil {
close(chunks)
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var chunk tlsChunk
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
LogError(errors.Errorf("Error parsing chunk %v", err))
continue
}
chunks <- &chunk
}
}
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension, func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
emitter api.Emitter, options *api.TrafficFilteringOptions) error { emitter api.Emitter, options *api.TrafficFilteringOptions) error {
ip, port, err := chunk.getAddress() ip, port, err := chunk.getAddress()
@@ -75,7 +131,7 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
reader.chunks <- chunk reader.chunks <- chunk
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" { if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
logTls(chunk, ip, port) p.logTls(chunk, ip, port)
} }
return nil return nil
@@ -92,10 +148,9 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
}, },
} }
isRequest := (chunk.isClient() && chunk.isWrite()) || (chunk.isServer() && chunk.isRead()) tcpid := p.buildTcpId(chunk, ip, port)
tcpid := buildTcpId(isRequest, ip, port)
go dissect(extension, reader, isRequest, &tcpid, emitter, options, p.reqResMatcher) go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
return reader return reader
} }
@@ -120,27 +175,36 @@ func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string {
return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port) return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port)
} }
func buildTcpId(isRequest bool, ip net.IP, port uint16) api.TcpID { func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID {
if isRequest { myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient())
if err != nil {
// May happen if the socket already closed, very likely to happen for localhost
//
myIp = api.UnknownIp
myPort = api.UnknownPort
}
if chunk.isRequest() {
return api.TcpID{ return api.TcpID{
SrcIP: UNKNOWN_HOST, SrcIP: myIp.String(),
DstIP: ip.String(), DstIP: ip.String(),
SrcPort: strconv.Itoa(int(UNKNOWN_PORT)), SrcPort: strconv.FormatUint(uint64(myPort), 10),
DstPort: strconv.FormatInt(int64(port), 10), DstPort: strconv.FormatUint(uint64(port), 10),
Ident: "", Ident: "",
} }
} else { } else {
return api.TcpID{ return api.TcpID{
SrcIP: ip.String(), SrcIP: ip.String(),
DstIP: UNKNOWN_HOST, DstIP: myIp.String(),
SrcPort: strconv.FormatInt(int64(port), 10), SrcPort: strconv.FormatUint(uint64(port), 10),
DstPort: strconv.Itoa(int(UNKNOWN_PORT)), DstPort: strconv.FormatUint(uint64(myPort), 10),
Ident: "", Ident: "",
} }
} }
} }
func logTls(chunk *tlsChunk, ip net.IP, port uint16) { func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
var flagsStr string var flagsStr string
if chunk.isClient() { if chunk.isClient() {
@@ -155,8 +219,13 @@ func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
flagsStr += "W" flagsStr += "W"
} }
srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true)
dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false)
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "") str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (recorded %v out of %v) - %v - %v", logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v) - %v - %v",
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port, chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded])) chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port,
srcIp, srcPort, dstIp, dstPort,
chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
} }

View File

@@ -1,13 +1,10 @@
package tlstapper package tlstapper
import ( import (
"bytes"
"encoding/binary"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/rlimit" "github.com/cilium/ebpf/rlimit"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
) )
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86 //go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86
@@ -16,10 +13,10 @@ type TlsTapper struct {
bpfObjects tlsTapperObjects bpfObjects tlsTapperObjects
syscallHooks syscallHooks syscallHooks syscallHooks
sslHooksStructs []sslHooks sslHooksStructs []sslHooks
reader *perf.Reader poller *tlsPoller
} }
func (t *TlsTapper) Init(bufferSize int) error { func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension) error {
logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize) logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize)
if err := setupRLimit(); err != nil { if err := setupRLimit(); err != nil {
@@ -27,55 +24,23 @@ func (t *TlsTapper) Init(bufferSize int) error {
} }
t.bpfObjects = tlsTapperObjects{} t.bpfObjects = tlsTapperObjects{}
if err := loadTlsTapperObjects(&t.bpfObjects, nil); err != nil { if err := loadTlsTapperObjects(&t.bpfObjects, nil); err != nil {
return errors.Wrap(err, 0) return errors.Wrap(err, 0)
} }
t.syscallHooks = syscallHooks{} t.syscallHooks = syscallHooks{}
if err := t.syscallHooks.installSyscallHooks(&t.bpfObjects); err != nil { if err := t.syscallHooks.installSyscallHooks(&t.bpfObjects); err != nil {
return err return err
} }
t.sslHooksStructs = make([]sslHooks, 0) t.sslHooksStructs = make([]sslHooks, 0)
return t.initChunksReader(bufferSize) t.poller = newTlsPoller(t, extension, procfs)
return t.poller.init(&t.bpfObjects, bufferSize)
} }
func (t *TlsTapper) pollPerf(chunks chan<- *tlsChunk) { func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
logger.Log.Infof("Start polling for tls events") t.poller.poll(emitter, options)
for {
record, err := t.reader.Read()
if err != nil {
close(chunks)
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var chunk tlsChunk
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
LogError(errors.Errorf("Error parsing chunk %v", err))
continue
}
chunks <- &chunk
}
} }
func (t *TlsTapper) GlobalTap(sslLibrary string) error { func (t *TlsTapper) GlobalTap(sslLibrary string) error {
@@ -118,7 +83,7 @@ func (t *TlsTapper) Close() []error {
errors = append(errors, sslHooks.close()...) errors = append(errors, sslHooks.close()...)
} }
if err := t.reader.Close(); err != nil { if err := t.poller.close(); err != nil {
errors = append(errors, err) errors = append(errors, err)
} }
@@ -135,18 +100,6 @@ func setupRLimit() error {
return nil return nil
} }
func (t *TlsTapper) initChunksReader(bufferSize int) error {
var err error
t.reader, err = perf.NewReader(t.bpfObjects.ChunksBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error { func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary) logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)

View File

@@ -52,12 +52,12 @@ export default class Api {
} }
getEntry = async (id, query) => { getEntry = async (id, query) => {
const response = await this.client.get(`/entries/${id}?query=${query}`); const response = await this.client.get(`/entries/${id}?query=${encodeURIComponent(query)}`);
return response.data; return response.data;
} }
fetchEntries = async (leftOff, direction, query, limit, timeoutMs) => { fetchEntries = async (leftOff, direction, query, limit, timeoutMs) => {
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${query}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) { const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${encodeURIComponent(query)}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
console.error(thrown.message); console.error(thrown.message);
return {}; return {};
}); });