diff --git a/agent/go.mod b/agent/go.mod index 5a525f462..8e2e5fef1 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -118,6 +118,7 @@ require ( github.com/tklauser/numcpus v0.4.0 // indirect github.com/ugorji/go/codec v1.2.6 // indirect github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 // indirect + github.com/wk8/go-ordered-map v1.0.0 // indirect github.com/xlab/treeprint v1.1.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.starlark.net v0.0.0-20220203230714-bb14e151c28f // indirect diff --git a/agent/go.sum b/agent/go.sum index 96aa71501..9e2e1563c 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -704,6 +704,8 @@ github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695AP github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/wI2L/jsondiff v0.1.1 h1:r2TkoEet7E4JMO5+s1RCY2R0LrNPNHY6hbDeow2hRHw= github.com/wI2L/jsondiff v0.1.1/go.mod h1:bAbJSAJXZtfOCZ5y3v7Mfb6UQa3DGdGFjQj1cNv8EcM= +github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8= +github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= diff --git a/tap/go.mod b/tap/go.mod index 7a7231b32..23cee8a50 100644 --- a/tap/go.mod +++ b/tap/go.mod @@ -15,6 +15,7 @@ require ( github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/dbgctl v0.0.0 github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 + github.com/wk8/go-ordered-map v1.0.0 k8s.io/api v0.23.3 ) diff --git a/tap/go.sum b/tap/go.sum index 823fde2ef..dd4f9124c 100644 --- a/tap/go.sum +++ b/tap/go.sum @@ -132,6 +132,7 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/struCoder/pidusage v0.2.1 h1:dFiEgUDkubeIj0XA1NpQ6+8LQmKrLi7NiIQl86E6BoY= @@ -142,6 +143,8 @@ github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq// github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8= +github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/tap/tlstapper/golang_connection.go b/tap/tlstapper/golang_connection.go new file mode 100644 index 000000000..9db74f2c9 --- /dev/null +++ b/tap/tlstapper/golang_connection.go @@ -0,0 +1,27 @@ +package tlstapper + +type golangConnection struct { + Pid uint32 + ConnAddr uint32 + AddressPair addressPair + Request []byte + Response []byte + GotRequest bool + GotResponse bool +} + +func NewGolangConnection(pid uint32, connAddr uint32) *golangConnection { + return &golangConnection{ + Pid: pid, + ConnAddr: connAddr, + } +} + +func (c *golangConnection) setAddressBySockfd(procfs string, pid uint32, fd uint32) error { + addrPair, err := getAddressBySockfd(procfs, pid, fd) + if err != nil { + return err + } + c.AddressPair = addrPair + return nil +} diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index 23643fab1..a5b23850c 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -4,8 +4,10 @@ import ( "bufio" "bytes" "fmt" + "log" "sync" "time" + "unsafe" "encoding/binary" "encoding/hex" @@ -14,26 +16,33 @@ import ( "strings" "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" "github.com/go-errors/errors" "github.com/hashicorp/golang-lru/simplelru" "github.com/up9inc/mizu/logger" "github.com/up9inc/mizu/tap/api" + orderedmap "github.com/wk8/go-ordered-map" ) -const fdCachedItemAvgSize = 40 -const fdCacheMaxItems = 500000 / fdCachedItemAvgSize +const ( + fdCachedItemAvgSize = 40 + fdCacheMaxItems = 500000 / fdCachedItemAvgSize + golangMapLimit = 1 << 10 // 1024 +) type tlsPoller struct { - tls *TlsTapper - readers map[string]*tlsReader - closedReaders chan string - reqResMatcher api.RequestResponseMatcher - chunksReader *perf.Reader - extension *api.Extension - procfs string - pidToNamespace sync.Map - fdCache *simplelru.LRU // Actual typs is map[string]addressPair - evictedCounter int + tls *TlsTapper + readers map[string]*tlsReader + closedReaders chan string + reqResMatcher api.RequestResponseMatcher + chunksReader *perf.Reader + golangReader *ringbuf.Reader + golangReadWriteMap *orderedmap.OrderedMap + extension *api.Extension + procfs string + pidToNamespace sync.Map + fdCache *simplelru.LRU // Actual typs is map[string]addressPair + evictedCounter int } func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) (*tlsPoller, error) { @@ -66,6 +75,12 @@ func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error { return errors.Wrap(err, 0) } + p.golangReader, err = ringbuf.NewReader(bpfObjects.GolangReadWrites) + + if err != nil { + return errors.Wrap(err, 0) + } + return nil } @@ -73,7 +88,7 @@ func (p *tlsPoller) close() error { return p.chunksReader.Close() } -func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) { +func (p *tlsPoller) pollLibssl(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) { chunks := make(chan *tlsChunk) go p.pollChunksPerfBuffer(chunks) @@ -94,6 +109,116 @@ func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptio } } +func (p *tlsPoller) pollGolang(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) { + go p.pollGolangReadWrite(p.golangReader, emitter, options, streamsMap) +} + +func (p *tlsPoller) pollGolangReadWrite(rd *ringbuf.Reader, emitter api.Emitter, options *api.TrafficFilteringOptions, + streamsMap api.TcpStreamMap) { + nativeEndian := p.getByteOrder() + // tlsTapperGolangReadWrite is generated by bpf2go. + var b tlsTapperGolangReadWrite + for { + record, err := rd.Read() + if err != nil { + if errors.Is(err, ringbuf.ErrClosed) { + log.Println("received signal, exiting..") + return + } + log.Printf("reading from reader: %s", err) + continue + } + + // Parse the ringbuf event entry into a tlsTapperGolangReadWrite structure. + if err := binary.Read(bytes.NewBuffer(record.RawSample), nativeEndian, &b); err != nil { + log.Printf("parsing ringbuf event: %s", err) + continue + } + + if p.golangReadWriteMap.Len()+1 > golangMapLimit { + pair := p.golangReadWriteMap.Oldest() + p.golangReadWriteMap.Delete(pair.Key) + } + + pid := uint64(b.Pid) + identifier := pid<<32 + uint64(b.ConnAddr) + + var connection *golangConnection + var _connection interface{} + var ok bool + if _connection, ok = p.golangReadWriteMap.Get(identifier); !ok { + connection = NewGolangConnection(b.Pid, b.ConnAddr) + p.golangReadWriteMap.Set(identifier, connection) + } else { + connection = _connection.(*golangConnection) + } + + if b.IsRequest { + err := connection.setAddressBySockfd(p.procfs, b.Pid, b.Fd) + if err != nil { + log.Printf("Error resolving address pair from fd: %s", err) + continue + } + + connection.Request = make([]byte, len(b.Data[:])) + copy(connection.Request, b.Data[:]) + connection.GotRequest = true + } else { + connection.Response = make([]byte, len(b.Data[:])) + copy(connection.Response, b.Data[:]) + connection.GotResponse = true + } + + if connection.GotRequest && connection.GotResponse { + tcpid := p.buildTcpId(&connection.AddressPair) + + tlsEmitter := &tlsEmitter{ + delegate: emitter, + namespace: p.getNamespace(b.Pid), + } + + reader := &tlsReader{ + chunks: make(chan *tlsChunk, 1), + progress: &api.ReadProgress{}, + tcpID: &tcpid, + isClient: true, + captureTime: time.Now(), + extension: p.extension, + emitter: tlsEmitter, + counterPair: &api.CounterPair{}, + reqResMatcher: p.reqResMatcher, + } + + stream := &tlsStream{ + reader: reader, + } + streamsMap.Store(streamsMap.NextId(), stream) + + reader.parent = stream + + err := p.extension.Dissector.Dissect(bufio.NewReader(bytes.NewReader(connection.Request)), reader, options) + + if err != nil { + logger.Log.Warningf("Error dissecting TLS %v - %v", reader.GetTcpID(), err) + } + + reader.isClient = false + reader.tcpID = &api.TcpID{ + SrcIP: reader.tcpID.DstIP, + DstIP: reader.tcpID.SrcIP, + SrcPort: reader.tcpID.DstPort, + DstPort: reader.tcpID.SrcPort, + } + + err = p.extension.Dissector.Dissect(bufio.NewReader(bytes.NewReader(connection.Response)), reader, options) + + if err != nil { + logger.Log.Warningf("Error dissecting TLS %v - %v", reader.GetTcpID(), err) + } + } + } +} + func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) { logger.Log.Infof("Start polling for tls events") @@ -162,7 +287,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, address *addressPair, key emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) *tlsReader { - tcpid := p.buildTcpId(chunk, address) + tcpid := p.buildTcpId(address) doneHandler := func(r *tlsReader) { p.closeReader(key, r) @@ -252,7 +377,7 @@ func buildTlsKey(address addressPair) string { return fmt.Sprintf("%s:%d>%s:%d", address.srcIp, address.srcPort, address.dstIp, address.dstPort) } -func (p *tlsPoller) buildTcpId(chunk *tlsChunk, address *addressPair) api.TcpID { +func (p *tlsPoller) buildTcpId(address *addressPair) api.TcpID { return api.TcpID{ SrcIP: address.srcIp.String(), DstIP: address.dstIp.String(), @@ -319,3 +444,19 @@ func (p *tlsPoller) fdCacheEvictCallback(key interface{}, value interface{}) { logger.Log.Infof("Tls fdCache evicted %d items", p.evictedCounter) } } + +func (p *tlsPoller) getByteOrder() (byteOrder binary.ByteOrder) { + buf := [2]byte{} + *(*uint16)(unsafe.Pointer(&buf[0])) = uint16(0xABCD) + + switch buf { + case [2]byte{0xCD, 0xAB}: + byteOrder = binary.LittleEndian + case [2]byte{0xAB, 0xCD}: + byteOrder = binary.BigEndian + default: + panic("Could not determine native endianness.") + } + + return +} diff --git a/tap/tlstapper/tls_tapper.go b/tap/tlstapper/tls_tapper.go index ec71035a3..4d8db445b 100644 --- a/tap/tlstapper/tls_tapper.go +++ b/tap/tlstapper/tls_tapper.go @@ -59,7 +59,8 @@ func (t *TlsTapper) Init(chunksBufferSize int, logBufferSize int, procfs string, } func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) { - t.poller.poll(emitter, options, streamsMap) + t.poller.pollLibssl(emitter, options, streamsMap) + t.poller.pollGolang(emitter, options, streamsMap) } func (t *TlsTapper) PollForLogging() {