Add golangConnection struct and implement pollGolangReadWrite method

This commit is contained in:
M. Mert Yildiran 2022-05-30 19:10:16 +03:00
parent 2dc40d5976
commit b91ad2c76c
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
7 changed files with 192 additions and 16 deletions

View File

@ -118,6 +118,7 @@ require (
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/ugorji/go/codec v1.2.6 // indirect
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.starlark.net v0.0.0-20220203230714-bb14e151c28f // indirect

View File

@ -704,6 +704,8 @@ github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695AP
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/wI2L/jsondiff v0.1.1 h1:r2TkoEet7E4JMO5+s1RCY2R0LrNPNHY6hbDeow2hRHw=
github.com/wI2L/jsondiff v0.1.1/go.mod h1:bAbJSAJXZtfOCZ5y3v7Mfb6UQa3DGdGFjQj1cNv8EcM=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=

View File

@ -15,6 +15,7 @@ require (
github.com/up9inc/mizu/tap/api v0.0.0
github.com/up9inc/mizu/tap/dbgctl v0.0.0
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
github.com/wk8/go-ordered-map v1.0.0
k8s.io/api v0.23.3
)

View File

@ -132,6 +132,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY=
@ -142,6 +143,8 @@ github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

View File

@ -0,0 +1,27 @@
package tlstapper
type golangConnection struct {
Pid uint32
ConnAddr uint32
AddressPair addressPair
Request []byte
Response []byte
GotRequest bool
GotResponse bool
}
func NewGolangConnection(pid uint32, connAddr uint32) *golangConnection {
return &golangConnection{
Pid: pid,
ConnAddr: connAddr,
}
}
func (c *golangConnection) setAddressBySockfd(procfs string, pid uint32, fd uint32) error {
addrPair, err := getAddressBySockfd(procfs, pid, fd)
if err != nil {
return err
}
c.AddressPair = addrPair
return nil
}

View File

@ -4,8 +4,10 @@ import (
"bufio"
"bytes"
"fmt"
"log"
"sync"
"time"
"unsafe"
"encoding/binary"
"encoding/hex"
@ -14,26 +16,33 @@ import (
"strings"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/go-errors/errors"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
orderedmap "github.com/wk8/go-ordered-map"
)
const fdCachedItemAvgSize = 40
const fdCacheMaxItems = 500000 / fdCachedItemAvgSize
const (
fdCachedItemAvgSize = 40
fdCacheMaxItems = 500000 / fdCachedItemAvgSize
golangMapLimit = 1 << 10 // 1024
)
type tlsPoller struct {
tls *TlsTapper
readers map[string]*tlsReader
closedReaders chan string
reqResMatcher api.RequestResponseMatcher
chunksReader *perf.Reader
extension *api.Extension
procfs string
pidToNamespace sync.Map
fdCache *simplelru.LRU // Actual typs is map[string]addressPair
evictedCounter int
tls *TlsTapper
readers map[string]*tlsReader
closedReaders chan string
reqResMatcher api.RequestResponseMatcher
chunksReader *perf.Reader
golangReader *ringbuf.Reader
golangReadWriteMap *orderedmap.OrderedMap
extension *api.Extension
procfs string
pidToNamespace sync.Map
fdCache *simplelru.LRU // Actual typs is map[string]addressPair
evictedCounter int
}
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) (*tlsPoller, error) {
@ -66,6 +75,12 @@ func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
return errors.Wrap(err, 0)
}
p.golangReader, err = ringbuf.NewReader(bpfObjects.GolangReadWrites)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
@ -73,7 +88,7 @@ func (p *tlsPoller) close() error {
return p.chunksReader.Close()
}
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
func (p *tlsPoller) pollLibssl(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
chunks := make(chan *tlsChunk)
go p.pollChunksPerfBuffer(chunks)
@ -94,6 +109,116 @@ func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptio
}
}
func (p *tlsPoller) pollGolang(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
go p.pollGolangReadWrite(p.golangReader, emitter, options, streamsMap)
}
func (p *tlsPoller) pollGolangReadWrite(rd *ringbuf.Reader, emitter api.Emitter, options *api.TrafficFilteringOptions,
streamsMap api.TcpStreamMap) {
nativeEndian := p.getByteOrder()
// tlsTapperGolangReadWrite is generated by bpf2go.
var b tlsTapperGolangReadWrite
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Println("received signal, exiting..")
return
}
log.Printf("reading from reader: %s", err)
continue
}
// Parse the ringbuf event entry into a tlsTapperGolangReadWrite structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), nativeEndian, &b); err != nil {
log.Printf("parsing ringbuf event: %s", err)
continue
}
if p.golangReadWriteMap.Len()+1 > golangMapLimit {
pair := p.golangReadWriteMap.Oldest()
p.golangReadWriteMap.Delete(pair.Key)
}
pid := uint64(b.Pid)
identifier := pid<<32 + uint64(b.ConnAddr)
var connection *golangConnection
var _connection interface{}
var ok bool
if _connection, ok = p.golangReadWriteMap.Get(identifier); !ok {
connection = NewGolangConnection(b.Pid, b.ConnAddr)
p.golangReadWriteMap.Set(identifier, connection)
} else {
connection = _connection.(*golangConnection)
}
if b.IsRequest {
err := connection.setAddressBySockfd(p.procfs, b.Pid, b.Fd)
if err != nil {
log.Printf("Error resolving address pair from fd: %s", err)
continue
}
connection.Request = make([]byte, len(b.Data[:]))
copy(connection.Request, b.Data[:])
connection.GotRequest = true
} else {
connection.Response = make([]byte, len(b.Data[:]))
copy(connection.Response, b.Data[:])
connection.GotResponse = true
}
if connection.GotRequest && connection.GotResponse {
tcpid := p.buildTcpId(&connection.AddressPair)
tlsEmitter := &tlsEmitter{
delegate: emitter,
namespace: p.getNamespace(b.Pid),
}
reader := &tlsReader{
chunks: make(chan *tlsChunk, 1),
progress: &api.ReadProgress{},
tcpID: &tcpid,
isClient: true,
captureTime: time.Now(),
extension: p.extension,
emitter: tlsEmitter,
counterPair: &api.CounterPair{},
reqResMatcher: p.reqResMatcher,
}
stream := &tlsStream{
reader: reader,
}
streamsMap.Store(streamsMap.NextId(), stream)
reader.parent = stream
err := p.extension.Dissector.Dissect(bufio.NewReader(bytes.NewReader(connection.Request)), reader, options)
if err != nil {
logger.Log.Warningf("Error dissecting TLS %v - %v", reader.GetTcpID(), err)
}
reader.isClient = false
reader.tcpID = &api.TcpID{
SrcIP: reader.tcpID.DstIP,
DstIP: reader.tcpID.SrcIP,
SrcPort: reader.tcpID.DstPort,
DstPort: reader.tcpID.SrcPort,
}
err = p.extension.Dissector.Dissect(bufio.NewReader(bytes.NewReader(connection.Response)), reader, options)
if err != nil {
logger.Log.Warningf("Error dissecting TLS %v - %v", reader.GetTcpID(), err)
}
}
}
}
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
logger.Log.Infof("Start polling for tls events")
@ -162,7 +287,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key
emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions,
streamsMap api.TcpStreamMap) *tlsReader {
tcpid := p.buildTcpId(chunk, address)
tcpid := p.buildTcpId(address)
doneHandler := func(r *tlsReader) {
p.closeReader(key, r)
@ -252,7 +377,7 @@ func buildTlsKey(address addressPair) string {
return fmt.Sprintf("%s:%d>%s:%d", address.srcIp, address.srcPort, address.dstIp, address.dstPort)
}
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, address *addressPair) api.TcpID {
func (p *tlsPoller) buildTcpId(address *addressPair) api.TcpID {
return api.TcpID{
SrcIP: address.srcIp.String(),
DstIP: address.dstIp.String(),
@ -319,3 +444,19 @@ func (p *tlsPoller) fdCacheEvictCallback(key interface{}, value interface{}) {
logger.Log.Infof("Tls fdCache evicted %d items", p.evictedCounter)
}
}
func (p *tlsPoller) getByteOrder() (byteOrder binary.ByteOrder) {
buf := [2]byte{}
*(*uint16)(unsafe.Pointer(&buf[0])) = uint16(0xABCD)
switch buf {
case [2]byte{0xCD, 0xAB}:
byteOrder = binary.LittleEndian
case [2]byte{0xAB, 0xCD}:
byteOrder = binary.BigEndian
default:
panic("Could not determine native endianness.")
}
return
}

View File

@ -59,7 +59,8 @@ func (t *TlsTapper) Init(chunksBufferSize int, logBufferSize int, procfs string,
}
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
t.poller.poll(emitter, options, streamsMap)
t.poller.pollLibssl(emitter, options, streamsMap)
t.poller.pollGolang(emitter, options, streamsMap)
}
func (t *TlsTapper) PollForLogging() {