mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-18 07:42:20 +00:00
Feature/fix tls not listening (#1046)
* avoid chunks with invalid address * tls tapper should distict between pids * prettfy tls verbose log and tls key * support tls from multi threads + duplicate calls to the same target * introduce fdCache and user address pair as tls key * remove unused comment * fix merge conflicts * use lru for fdcache * pr fixes - renaming * fix conflict issue
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -16,10 +15,14 @@ import (
|
||||
|
||||
"github.com/cilium/ebpf/perf"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/hashicorp/golang-lru/simplelru"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
const fdCachedItemAvgSize = 40
|
||||
const fdCacheMaxItems = 500000 / fdCachedItemAvgSize
|
||||
|
||||
type tlsPoller struct {
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
@@ -29,10 +32,12 @@ type tlsPoller struct {
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
pidToNamespace sync.Map
|
||||
fdCache *simplelru.LRU // Actual typs is map[string]addressPair
|
||||
evictedCounter int
|
||||
}
|
||||
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||
return &tlsPoller{
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) (*tlsPoller, error) {
|
||||
poller := &tlsPoller{
|
||||
tls: tls,
|
||||
readers: make(map[string]*tlsReader),
|
||||
closedReaders: make(chan string, 100),
|
||||
@@ -41,6 +46,15 @@ func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsP
|
||||
chunksReader: nil,
|
||||
procfs: procfs,
|
||||
}
|
||||
|
||||
fdCache, err := simplelru.NewLRU(fdCacheMaxItems, poller.fdCacheEvictCallback)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
poller.fdCache = fdCache
|
||||
return poller, nil
|
||||
}
|
||||
|
||||
func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
|
||||
@@ -117,35 +131,38 @@ func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
|
||||
|
||||
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension, emitter api.Emitter,
|
||||
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) error {
|
||||
ip, port, err := chunk.getAddress()
|
||||
address, err := p.getSockfdAddressPair(chunk)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
address, err = chunk.getAddressPair()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
key := buildTlsKey(chunk, ip, port)
|
||||
key := buildTlsKey(address)
|
||||
reader, exists := p.readers[key]
|
||||
|
||||
if !exists {
|
||||
reader = p.startNewTlsReader(chunk, ip, port, key, emitter, extension, options, streamsMap)
|
||||
reader = p.startNewTlsReader(chunk, &address, key, emitter, extension, options, streamsMap)
|
||||
p.readers[key] = reader
|
||||
}
|
||||
|
||||
reader.captureTime = time.Now()
|
||||
reader.chunks <- chunk
|
||||
reader.newChunk(chunk)
|
||||
|
||||
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
||||
p.logTls(chunk, ip, port)
|
||||
p.logTls(chunk, key, reader)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string,
|
||||
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key string,
|
||||
emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions,
|
||||
streamsMap api.TcpStreamMap) *tlsReader {
|
||||
|
||||
tcpid := p.buildTcpId(chunk, ip, port)
|
||||
tcpid := p.buildTcpId(chunk, address)
|
||||
|
||||
doneHandler := func(r *tlsReader) {
|
||||
p.closeReader(key, r)
|
||||
@@ -197,36 +214,52 @@ func (p *tlsPoller) closeReader(key string, r *tlsReader) {
|
||||
p.closedReaders <- key
|
||||
}
|
||||
|
||||
func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string {
|
||||
return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port)
|
||||
}
|
||||
func (p *tlsPoller) getSockfdAddressPair(chunk *tlsChunk) (addressPair, error) {
|
||||
address, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd)
|
||||
fdCacheKey := fmt.Sprintf("%d:%d", chunk.Pid, chunk.Fd)
|
||||
|
||||
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID {
|
||||
myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient())
|
||||
|
||||
if err != nil {
|
||||
// May happen if the socket already closed, very likely to happen for localhost
|
||||
//
|
||||
myIp = api.UnknownIp
|
||||
myPort = api.UnknownPort
|
||||
if err == nil {
|
||||
if !chunk.isRequest() {
|
||||
switchedAddress := addressPair{
|
||||
srcIp: address.dstIp,
|
||||
srcPort: address.dstPort,
|
||||
dstIp: address.srcIp,
|
||||
dstPort: address.srcPort,
|
||||
}
|
||||
p.fdCache.Add(fdCacheKey, switchedAddress)
|
||||
return switchedAddress, nil
|
||||
} else {
|
||||
p.fdCache.Add(fdCacheKey, address)
|
||||
return address, nil
|
||||
}
|
||||
}
|
||||
|
||||
if chunk.isRequest() {
|
||||
return api.TcpID{
|
||||
SrcIP: myIp.String(),
|
||||
DstIP: ip.String(),
|
||||
SrcPort: strconv.FormatUint(uint64(myPort), 10),
|
||||
DstPort: strconv.FormatUint(uint64(port), 10),
|
||||
Ident: "",
|
||||
}
|
||||
} else {
|
||||
return api.TcpID{
|
||||
SrcIP: ip.String(),
|
||||
DstIP: myIp.String(),
|
||||
SrcPort: strconv.FormatUint(uint64(port), 10),
|
||||
DstPort: strconv.FormatUint(uint64(myPort), 10),
|
||||
Ident: "",
|
||||
}
|
||||
fromCacheIfc, ok := p.fdCache.Get(fdCacheKey)
|
||||
|
||||
if !ok {
|
||||
return addressPair{}, err
|
||||
}
|
||||
|
||||
fromCache, ok := fromCacheIfc.(addressPair)
|
||||
|
||||
if !ok {
|
||||
return address, errors.Errorf("Unable to cast %T to addressPair", fromCacheIfc)
|
||||
}
|
||||
|
||||
return fromCache, nil
|
||||
}
|
||||
|
||||
func buildTlsKey(address addressPair) string {
|
||||
return fmt.Sprintf("%s:%d>%s:%d", address.srcIp, address.srcPort, address.dstIp, address.dstPort)
|
||||
}
|
||||
|
||||
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, address *addressPair) api.TcpID {
|
||||
return api.TcpID{
|
||||
SrcIP: address.srcIp.String(),
|
||||
DstIP: address.dstIp.String(),
|
||||
SrcPort: strconv.FormatUint(uint64(address.srcPort), 10),
|
||||
DstPort: strconv.FormatUint(uint64(address.dstPort), 10),
|
||||
Ident: "",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,7 +290,7 @@ func (p *tlsPoller) clearPids() {
|
||||
})
|
||||
}
|
||||
|
||||
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
func (p *tlsPoller) logTls(chunk *tlsChunk, key string, reader *tlsReader) {
|
||||
var flagsStr string
|
||||
|
||||
if chunk.isClient() {
|
||||
@@ -272,13 +305,18 @@ func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
flagsStr += "W"
|
||||
}
|
||||
|
||||
srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true)
|
||||
dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false)
|
||||
|
||||
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
|
||||
|
||||
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v starting at %v) - %v - %v",
|
||||
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port,
|
||||
srcIp, srcPort, dstIp, dstPort,
|
||||
chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
logger.Log.Infof("[%-44s] %s #%-4d (fd: %d) (recorded %d/%d:%d) - %s - %s",
|
||||
key, flagsStr, reader.seenChunks, chunk.Fd,
|
||||
chunk.Recorded, chunk.Len, chunk.Start,
|
||||
str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
}
|
||||
|
||||
func (p *tlsPoller) fdCacheEvictCallback(key interface{}, value interface{}) {
|
||||
p.evictedCounter = p.evictedCounter + 1
|
||||
|
||||
if p.evictedCounter%1000000 == 0 {
|
||||
logger.Log.Infof("Tls fdCache evicted %d items", p.evictedCounter)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user