mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-19 00:49:35 +00:00
Fetch source and destination addresses with bpf from tcp kprobes, similar to how it is done for openssl lib. Chunk contains both source address and destination address. FD is no longer used to obtain addresses.
282 lines
6.4 KiB
Go
282 lines
6.4 KiB
Go
package tlstapper
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/cilium/ebpf/perf"
|
|
"github.com/go-errors/errors"
|
|
"github.com/hashicorp/golang-lru/simplelru"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
const (
|
|
fdCachedItemAvgSize = 40
|
|
fdCacheMaxItems = 500000 / fdCachedItemAvgSize
|
|
)
|
|
|
|
type tlsPoller struct {
|
|
tls *TlsTapper
|
|
readers map[string]*tlsReader
|
|
closedReaders chan string
|
|
reqResMatcher api.RequestResponseMatcher
|
|
chunksReader *perf.Reader
|
|
extension *api.Extension
|
|
procfs string
|
|
pidToNamespace sync.Map
|
|
fdCache *simplelru.LRU // Actual type is map[string]addressPair
|
|
evictedCounter int
|
|
}
|
|
|
|
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) (*tlsPoller, error) {
|
|
poller := &tlsPoller{
|
|
tls: tls,
|
|
readers: make(map[string]*tlsReader),
|
|
closedReaders: make(chan string, 100),
|
|
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
|
extension: extension,
|
|
chunksReader: nil,
|
|
procfs: procfs,
|
|
}
|
|
|
|
fdCache, err := simplelru.NewLRU(fdCacheMaxItems, poller.fdCacheEvictCallback)
|
|
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, 0)
|
|
}
|
|
|
|
poller.fdCache = fdCache
|
|
return poller, nil
|
|
}
|
|
|
|
func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
|
|
var err error
|
|
|
|
p.chunksReader, err = perf.NewReader(bpfObjects.ChunksBuffer, bufferSize)
|
|
|
|
if err != nil {
|
|
return errors.Wrap(err, 0)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *tlsPoller) close() error {
|
|
return p.chunksReader.Close()
|
|
}
|
|
|
|
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
|
|
// tlsTapperTlsChunk is generated by bpf2go.
|
|
chunks := make(chan *tlsTapperTlsChunk)
|
|
|
|
go p.pollChunksPerfBuffer(chunks)
|
|
|
|
for {
|
|
select {
|
|
case chunk, ok := <-chunks:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if err := p.handleTlsChunk(chunk, p.extension, emitter, options, streamsMap); err != nil {
|
|
LogError(err)
|
|
}
|
|
case key := <-p.closedReaders:
|
|
delete(p.readers, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsTapperTlsChunk) {
|
|
logger.Log.Infof("Start polling for tls events")
|
|
|
|
for {
|
|
record, err := p.chunksReader.Read()
|
|
|
|
if err != nil {
|
|
close(chunks)
|
|
|
|
if errors.Is(err, perf.ErrClosed) {
|
|
return
|
|
}
|
|
|
|
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
|
|
return
|
|
}
|
|
|
|
if record.LostSamples != 0 {
|
|
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
|
|
continue
|
|
}
|
|
|
|
buffer := bytes.NewReader(record.RawSample)
|
|
|
|
var chunk tlsTapperTlsChunk
|
|
|
|
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
|
|
LogError(errors.Errorf("Error parsing chunk %v", err))
|
|
continue
|
|
}
|
|
|
|
chunks <- &chunk
|
|
}
|
|
}
|
|
|
|
func (p *tlsPoller) handleTlsChunk(chunk *tlsTapperTlsChunk, extension *api.Extension, emitter api.Emitter,
|
|
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) error {
|
|
address := chunk.getAddressPair()
|
|
|
|
key := buildTlsKey(address)
|
|
reader, exists := p.readers[key]
|
|
|
|
if !exists {
|
|
reader = p.startNewTlsReader(chunk, &address, key, emitter, extension, options, streamsMap)
|
|
p.readers[key] = reader
|
|
}
|
|
|
|
reader.newChunk(chunk)
|
|
|
|
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
|
p.logTls(chunk, key, reader)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *tlsPoller) startNewTlsReader(chunk *tlsTapperTlsChunk, address *addressPair, key string,
|
|
emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions,
|
|
streamsMap api.TcpStreamMap) *tlsReader {
|
|
|
|
tcpid := p.buildTcpId(address)
|
|
|
|
doneHandler := func(r *tlsReader) {
|
|
p.closeReader(key, r)
|
|
}
|
|
|
|
tlsEmitter := &tlsEmitter{
|
|
delegate: emitter,
|
|
namespace: p.getNamespace(chunk.Pid),
|
|
}
|
|
|
|
reader := &tlsReader{
|
|
key: key,
|
|
chunks: make(chan *tlsTapperTlsChunk, 1),
|
|
doneHandler: doneHandler,
|
|
progress: &api.ReadProgress{},
|
|
tcpID: &tcpid,
|
|
isClient: chunk.isRequest(),
|
|
captureTime: time.Now(),
|
|
extension: extension,
|
|
emitter: tlsEmitter,
|
|
counterPair: &api.CounterPair{},
|
|
reqResMatcher: p.reqResMatcher,
|
|
}
|
|
|
|
stream := &tlsStream{
|
|
reader: reader,
|
|
}
|
|
streamsMap.Store(streamsMap.NextId(), stream)
|
|
|
|
reader.parent = stream
|
|
|
|
go dissect(extension, reader, options)
|
|
return reader
|
|
}
|
|
|
|
func dissect(extension *api.Extension, reader api.TcpReader, options *api.TrafficFilteringOptions) {
|
|
b := bufio.NewReader(reader)
|
|
|
|
err := extension.Dissector.Dissect(b, reader, options)
|
|
|
|
if err != nil {
|
|
logger.Log.Warningf("Error dissecting TLS %v - %v", reader.GetTcpID(), err)
|
|
}
|
|
}
|
|
|
|
func (p *tlsPoller) closeReader(key string, r *tlsReader) {
|
|
close(r.chunks)
|
|
p.closedReaders <- key
|
|
}
|
|
|
|
func buildTlsKey(address addressPair) string {
|
|
return fmt.Sprintf("%s:%d>%s:%d", address.srcIp, address.srcPort, address.dstIp, address.dstPort)
|
|
}
|
|
|
|
func (p *tlsPoller) buildTcpId(address *addressPair) api.TcpID {
|
|
return api.TcpID{
|
|
SrcIP: address.srcIp.String(),
|
|
DstIP: address.dstIp.String(),
|
|
SrcPort: strconv.FormatUint(uint64(address.srcPort), 10),
|
|
DstPort: strconv.FormatUint(uint64(address.dstPort), 10),
|
|
Ident: "",
|
|
}
|
|
}
|
|
|
|
func (p *tlsPoller) addPid(pid uint32, namespace string) {
|
|
p.pidToNamespace.Store(pid, namespace)
|
|
}
|
|
|
|
func (p *tlsPoller) getNamespace(pid uint32) string {
|
|
namespaceIfc, ok := p.pidToNamespace.Load(pid)
|
|
|
|
if !ok {
|
|
return api.UnknownNamespace
|
|
}
|
|
|
|
namespace, ok := namespaceIfc.(string)
|
|
|
|
if !ok {
|
|
return api.UnknownNamespace
|
|
}
|
|
|
|
return namespace
|
|
}
|
|
|
|
func (p *tlsPoller) clearPids() {
|
|
p.pidToNamespace.Range(func(key, v interface{}) bool {
|
|
p.pidToNamespace.Delete(key)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (p *tlsPoller) logTls(chunk *tlsTapperTlsChunk, key string, reader *tlsReader) {
|
|
var flagsStr string
|
|
|
|
if chunk.isClient() {
|
|
flagsStr = "C"
|
|
} else {
|
|
flagsStr = "S"
|
|
}
|
|
|
|
if chunk.isRead() {
|
|
flagsStr += "R"
|
|
} else {
|
|
flagsStr += "W"
|
|
}
|
|
|
|
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
|
|
|
|
logger.Log.Infof("[%-44s] %s #%-4d (fd: %d) (recorded %d/%d:%d) - %s - %s",
|
|
key, flagsStr, reader.seenChunks, chunk.Fd,
|
|
chunk.Recorded, chunk.Len, chunk.Start,
|
|
str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
|
}
|
|
|
|
func (p *tlsPoller) fdCacheEvictCallback(key interface{}, value interface{}) {
|
|
p.evictedCounter = p.evictedCounter + 1
|
|
|
|
if p.evictedCounter%1000000 == 0 {
|
|
logger.Log.Infof("Tls fdCache evicted %d items", p.evictedCounter)
|
|
}
|
|
}
|