diff --git a/agent/go.mod b/agent/go.mod index b1cd82e82..9564468c6 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -76,6 +76,7 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/agent/go.sum b/agent/go.sum index 499f9dd19..624535717 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -405,6 +405,7 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/tap/go.mod b/tap/go.mod index 150fe1477..86bfa6edc 100644 --- a/tap/go.mod +++ b/tap/go.mod @@ -18,6 +18,7 @@ require ( github.com/google/go-cmp v0.5.7 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/martian v2.1.0+incompatible // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect diff --git a/tap/go.sum b/tap/go.sum index 455fecead..53b77414e 100644 --- a/tap/go.sum +++ b/tap/go.sum @@ -71,6 +71,8 @@ github.com/googleapis/gnostic v0.5.1/go.mod h1:6U4PtQXGIEt/Z3h5MAT7FNofLnw9vXk2c github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= diff --git a/tap/tlstapper/bpf/openssl_uprobes.c b/tap/tlstapper/bpf/openssl_uprobes.c index a80107511..d4efa3392 100644 --- a/tap/tlstapper/bpf/openssl_uprobes.c +++ b/tap/tlstapper/bpf/openssl_uprobes.c @@ -48,14 +48,14 @@ static __always_inline int get_count_bytes(struct pt_regs *ctx, struct ssl_info* return countBytes; } -static __always_inline void add_address_to_chunk(struct pt_regs *ctx, struct tlsChunk* chunk, __u64 id, __u32 fd) { +static __always_inline int add_address_to_chunk(struct pt_regs *ctx, struct tlsChunk* chunk, __u64 id, __u32 fd) { __u32 pid = id >> 32; __u64 key = (__u64) pid << 32 | fd; struct fd_info *fdinfo = bpf_map_lookup_elem(&file_descriptor_to_ipv4, &key); if (fdinfo == NULL) { - return; + return 0; } int err = bpf_probe_read(chunk->address, sizeof(chunk->address), fdinfo->ipv4_addr); @@ -63,7 +63,10 @@ static __always_inline void add_address_to_chunk(struct pt_regs *ctx, struct tls if (err != 0) { log_error(ctx, LOG_ERROR_READING_FD_ADDRESS, id, err, 0l); + return 0; } + + return 1; } static __always_inline void send_chunk_part(struct pt_regs *ctx, __u8* buffer, __u64 id, @@ -143,7 +146,12 @@ static __always_inline void output_ssl_chunk(struct pt_regs *ctx, struct ssl_inf chunk->len = countBytes; chunk->fd = info->fd; - add_address_to_chunk(ctx, chunk, id, chunk->fd); + if (!add_address_to_chunk(ctx, chunk, id, chunk->fd)) { + // Without an address, we drop the chunk because there is not much to do with it in Go + // + return; + } + send_chunk(ctx, info->buffer, id, chunk); } diff --git a/tap/tlstapper/chunk.go b/tap/tlstapper/chunk.go index 403a3fcdc..2ffc153aa 100644 --- a/tap/tlstapper/chunk.go +++ b/tap/tlstapper/chunk.go @@ -6,6 +6,7 @@ import ( "net" "github.com/go-errors/errors" + "github.com/up9inc/mizu/tap/api" ) const FLAGS_IS_CLIENT_BIT uint32 = (1 << 0) @@ -73,3 +74,27 @@ func (c *tlsChunk) getRecordedData() []byte { func (c *tlsChunk) isRequest() bool { return (c.isClient() && c.isWrite()) || (c.isServer() && c.isRead()) } + +func (c *tlsChunk) getAddressPair() (addressPair, error) { + ip, port, err := c.getAddress() + + if err != nil { + return addressPair{}, err + } + + if c.isRequest() { + return addressPair{ + srcIp: api.UnknownIp, + srcPort: api.UnknownPort, + dstIp: ip, + dstPort: port, + }, nil + } else { + return addressPair{ + srcIp: ip, + srcPort: port, + dstIp: api.UnknownIp, + dstPort: api.UnknownPort, + }, nil + } +} diff --git a/tap/tlstapper/sockfd_info.go b/tap/tlstapper/sockfd_info.go index 3b41d93fd..c1f078c94 100644 --- a/tap/tlstapper/sockfd_info.go +++ b/tap/tlstapper/sockfd_info.go @@ -20,10 +20,17 @@ const ( INODE_FILED_INDEX = 9 ) +type addressPair struct { + srcIp net.IP + srcPort uint16 + dstIp net.IP + dstPort uint16 +} + // This file helps to extract Ip and Port out of a Socket file descriptor. -// +// // The equivalent bash commands are: -// +// // > ls -l /proc//fd/ // Output something like "socket:[1234]" for sockets - 1234 is the inode of the socket // > cat /proc//net/tcp | grep @@ -31,18 +38,18 @@ const ( // The 1st and 2nd fields are the source and dest ip and ports in a Hex format // 0100007F:50 is 127.0.0.1:80 -func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, uint16, error) { +func getAddressBySockfd(procfs string, pid uint32, fd uint32) (addressPair, error) { inode, err := getSocketInode(procfs, pid, fd) if err != nil { - return nil, 0, err + return addressPair{}, err } tcppath := fmt.Sprintf("%s/%d/net/tcp", procfs, pid) tcp, err := ioutil.ReadFile(tcppath) if err != nil { - return nil, 0, errors.Wrap(err, 0) + return addressPair{}, errors.Wrap(err, 0) } for _, line := range strings.Split(string(tcp), "\n") { @@ -53,15 +60,28 @@ func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, } if inode == parts[INODE_FILED_INDEX] { - if src { - return parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX]) - } else { - return parseHexAddress(parts[DST_ADDRESS_FILED_INDEX]) + srcIp, srcPort, srcErr := parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX]) + + if srcErr != nil { + return addressPair{}, srcErr } + + dstIp, dstPort, dstErr := parseHexAddress(parts[DST_ADDRESS_FILED_INDEX]) + + if dstErr != nil { + return addressPair{}, dstErr + } + + return addressPair{ + srcIp: srcIp, + srcPort: srcPort, + dstIp: dstIp, + dstPort: dstPort, + }, nil } } - return nil, 0, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode) + return addressPair{}, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode) } func getSocketInode(procfs string, pid uint32, fd uint32) (string, error) { diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index 4acc8ca0f..faba94529 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "fmt" - "net" "sync" "time" @@ -16,10 +15,14 @@ import ( "github.com/cilium/ebpf/perf" "github.com/go-errors/errors" + "github.com/hashicorp/golang-lru/simplelru" "github.com/up9inc/mizu/logger" "github.com/up9inc/mizu/tap/api" ) +const fdCachedItemAvgSize = 40 +const fdCacheMaxItems = 500000 / fdCachedItemAvgSize + type tlsPoller struct { tls *TlsTapper readers map[string]*tlsReader @@ -29,10 +32,12 @@ type tlsPoller struct { extension *api.Extension procfs string pidToNamespace sync.Map + fdCache *simplelru.LRU // Actual typs is map[string]addressPair + evictedCounter int } -func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller { - return &tlsPoller{ +func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) (*tlsPoller, error) { + poller := &tlsPoller{ tls: tls, readers: make(map[string]*tlsReader), closedReaders: make(chan string, 100), @@ -41,6 +46,15 @@ func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsP chunksReader: nil, procfs: procfs, } + + fdCache, err := simplelru.NewLRU(fdCacheMaxItems, poller.fdCacheEvictCallback) + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + poller.fdCache = fdCache + return poller, nil } func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error { @@ -117,35 +131,38 @@ func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) { func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension, emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) error { - ip, port, err := chunk.getAddress() + address, err := p.getSockfdAddressPair(chunk) if err != nil { - return err + address, err = chunk.getAddressPair() + + if err != nil { + return err + } } - key := buildTlsKey(chunk, ip, port) + key := buildTlsKey(address) reader, exists := p.readers[key] if !exists { - reader = p.startNewTlsReader(chunk, ip, port, key, emitter, extension, options, streamsMap) + reader = p.startNewTlsReader(chunk, &address, key, emitter, extension, options, streamsMap) p.readers[key] = reader } - reader.captureTime = time.Now() - reader.chunks <- chunk + reader.newChunk(chunk) if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" { - p.logTls(chunk, ip, port) + p.logTls(chunk, key, reader) } return nil } -func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string, +func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key string, emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) *tlsReader { - tcpid := p.buildTcpId(chunk, ip, port) + tcpid := p.buildTcpId(chunk, address) doneHandler := func(r *tlsReader) { p.closeReader(key, r) @@ -197,36 +214,52 @@ func (p *tlsPoller) closeReader(key string, r *tlsReader) { p.closedReaders <- key } -func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string { - return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port) -} +func (p *tlsPoller) getSockfdAddressPair(chunk *tlsChunk) (addressPair, error) { + address, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd) + fdCacheKey := fmt.Sprintf("%d:%d", chunk.Pid, chunk.Fd) -func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID { - myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient()) - - if err != nil { - // May happen if the socket already closed, very likely to happen for localhost - // - myIp = api.UnknownIp - myPort = api.UnknownPort + if err == nil { + if !chunk.isRequest() { + switchedAddress := addressPair{ + srcIp: address.dstIp, + srcPort: address.dstPort, + dstIp: address.srcIp, + dstPort: address.srcPort, + } + p.fdCache.Add(fdCacheKey, switchedAddress) + return switchedAddress, nil + } else { + p.fdCache.Add(fdCacheKey, address) + return address, nil + } } - if chunk.isRequest() { - return api.TcpID{ - SrcIP: myIp.String(), - DstIP: ip.String(), - SrcPort: strconv.FormatUint(uint64(myPort), 10), - DstPort: strconv.FormatUint(uint64(port), 10), - Ident: "", - } - } else { - return api.TcpID{ - SrcIP: ip.String(), - DstIP: myIp.String(), - SrcPort: strconv.FormatUint(uint64(port), 10), - DstPort: strconv.FormatUint(uint64(myPort), 10), - Ident: "", - } + fromCacheIfc, ok := p.fdCache.Get(fdCacheKey) + + if !ok { + return addressPair{}, err + } + + fromCache, ok := fromCacheIfc.(addressPair) + + if !ok { + return address, errors.Errorf("Unable to cast %T to addressPair", fromCacheIfc) + } + + return fromCache, nil +} + +func buildTlsKey(address addressPair) string { + return fmt.Sprintf("%s:%d>%s:%d", address.srcIp, address.srcPort, address.dstIp, address.dstPort) +} + +func (p *tlsPoller) buildTcpId(chunk *tlsChunk, address *addressPair) api.TcpID { + return api.TcpID{ + SrcIP: address.srcIp.String(), + DstIP: address.dstIp.String(), + SrcPort: strconv.FormatUint(uint64(address.srcPort), 10), + DstPort: strconv.FormatUint(uint64(address.dstPort), 10), + Ident: "", } } @@ -257,7 +290,7 @@ func (p *tlsPoller) clearPids() { }) } -func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) { +func (p *tlsPoller) logTls(chunk *tlsChunk, key string, reader *tlsReader) { var flagsStr string if chunk.isClient() { @@ -272,13 +305,18 @@ func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) { flagsStr += "W" } - srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true) - dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false) - str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "") - logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v starting at %v) - %v - %v", - chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port, - srcIp, srcPort, dstIp, dstPort, - chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded])) + logger.Log.Infof("[%-44s] %s #%-4d (fd: %d) (recorded %d/%d:%d) - %s - %s", + key, flagsStr, reader.seenChunks, chunk.Fd, + chunk.Recorded, chunk.Len, chunk.Start, + str, hex.EncodeToString(chunk.Data[0:chunk.Recorded])) +} + +func (p *tlsPoller) fdCacheEvictCallback(key interface{}, value interface{}) { + p.evictedCounter = p.evictedCounter + 1 + + if p.evictedCounter%1000000 == 0 { + logger.Log.Infof("Tls fdCache evicted %d items", p.evictedCounter) + } } diff --git a/tap/tlstapper/tls_reader.go b/tap/tlstapper/tls_reader.go index fa1c91611..7856b057f 100644 --- a/tap/tlstapper/tls_reader.go +++ b/tap/tlstapper/tls_reader.go @@ -10,6 +10,7 @@ import ( type tlsReader struct { key string chunks chan *tlsChunk + seenChunks int data []byte doneHandler func(r *tlsReader) progress *api.ReadProgress @@ -23,6 +24,12 @@ type tlsReader struct { reqResMatcher api.RequestResponseMatcher } +func (r *tlsReader) newChunk(chunk *tlsChunk) { + r.captureTime = time.Now() + r.seenChunks = r.seenChunks + 1 + r.chunks <- chunk +} + func (r *tlsReader) Read(p []byte) (int, error) { var chunk *tlsChunk diff --git a/tap/tlstapper/tls_tapper.go b/tap/tlstapper/tls_tapper.go index 63bb340d7..b6fd173e1 100644 --- a/tap/tlstapper/tls_tapper.go +++ b/tap/tlstapper/tls_tapper.go @@ -46,7 +46,13 @@ func (t *TlsTapper) Init(chunksBufferSize int, logBufferSize int, procfs string, return err } - t.poller = newTlsPoller(t, extension, procfs) + var err error + t.poller, err = newTlsPoller(t, extension, procfs) + + if err != nil { + return err + } + return t.poller.init(&t.bpfObjects, chunksBufferSize) } diff --git a/tap/tlstapper/tlstapper_bpfeb.o b/tap/tlstapper/tlstapper_bpfeb.o index cbe4c8a1a..6508e3065 100644 Binary files a/tap/tlstapper/tlstapper_bpfeb.o and b/tap/tlstapper/tlstapper_bpfeb.o differ diff --git a/tap/tlstapper/tlstapper_bpfel.o b/tap/tlstapper/tlstapper_bpfel.o index 971f23e8d..0af7492f4 100644 Binary files a/tap/tlstapper/tlstapper_bpfel.o and b/tap/tlstapper/tlstapper_bpfel.o differ