mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-25 04:17:25 +00:00
Fix tls + creating tls_stream (#1058)
This commit is contained in:
@@ -21,34 +21,25 @@ import (
|
||||
)
|
||||
|
||||
type tlsPoller struct {
|
||||
tls *TlsTapper
|
||||
readers map[string]api.TcpReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
pidToNamespace sync.Map
|
||||
isClosed bool
|
||||
protoIdentifier *api.ProtoIdentifier
|
||||
isTapTarget bool
|
||||
origin api.Capture
|
||||
createdAt time.Time
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
pidToNamespace sync.Map
|
||||
}
|
||||
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||
return &tlsPoller{
|
||||
tls: tls,
|
||||
readers: make(map[string]api.TcpReader),
|
||||
closedReaders: make(chan string, 100),
|
||||
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
||||
extension: extension,
|
||||
chunksReader: nil,
|
||||
procfs: procfs,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
isTapTarget: true,
|
||||
origin: api.Ebpf,
|
||||
createdAt: time.Now(),
|
||||
tls: tls,
|
||||
readers: make(map[string]*tlsReader),
|
||||
closedReaders: make(chan string, 100),
|
||||
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
||||
extension: extension,
|
||||
chunksReader: nil,
|
||||
procfs: procfs,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,24 +126,13 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
||||
key := buildTlsKey(chunk, ip, port)
|
||||
reader, exists := p.readers[key]
|
||||
|
||||
newReader := NewTlsReader(
|
||||
key,
|
||||
func(r *tlsReader) {
|
||||
p.closeReader(key, r)
|
||||
},
|
||||
chunk.isRequest(),
|
||||
p,
|
||||
)
|
||||
|
||||
if !exists {
|
||||
reader = p.startNewTlsReader(chunk, ip, port, key, extension, newReader, options)
|
||||
reader = p.startNewTlsReader(chunk, ip, port, key, emitter, extension, options)
|
||||
p.readers[key] = reader
|
||||
}
|
||||
|
||||
tlsReader := reader.(*tlsReader)
|
||||
|
||||
tlsReader.setCaptureTime(time.Now())
|
||||
tlsReader.sendChunk(chunk)
|
||||
reader.captureTime = time.Now()
|
||||
reader.chunks <- chunk
|
||||
|
||||
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
||||
p.logTls(chunk, ip, port)
|
||||
@@ -161,25 +141,46 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string, extension *api.Extension,
|
||||
reader api.TcpReader, options *api.TrafficFilteringOptions) api.TcpReader {
|
||||
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string,
|
||||
emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions) *tlsReader {
|
||||
|
||||
tcpid := p.buildTcpId(chunk, ip, port)
|
||||
|
||||
tlsReader := reader.(*tlsReader)
|
||||
tlsReader.setTcpID(&tcpid)
|
||||
doneHandler := func(r *tlsReader) {
|
||||
p.closeReader(key, r)
|
||||
}
|
||||
|
||||
tlsReader.setEmitter(&tlsEmitter{
|
||||
delegate: reader.GetEmitter(),
|
||||
tlsEmitter := &tlsEmitter{
|
||||
delegate: emitter,
|
||||
namespace: p.getNamespace(chunk.Pid),
|
||||
})
|
||||
}
|
||||
|
||||
reader := &tlsReader{
|
||||
key: key,
|
||||
chunks: make(chan *tlsChunk, 1),
|
||||
doneHandler: doneHandler,
|
||||
progress: &api.ReadProgress{},
|
||||
tcpID: &tcpid,
|
||||
isClient: chunk.isRequest(),
|
||||
captureTime: time.Now(),
|
||||
extension: extension,
|
||||
emitter: tlsEmitter,
|
||||
counterPair: &api.CounterPair{},
|
||||
reqResMatcher: p.reqResMatcher,
|
||||
}
|
||||
|
||||
stream := &tlsStream{
|
||||
reader: reader,
|
||||
protoIdentifier: &api.ProtoIdentifier{},
|
||||
}
|
||||
|
||||
reader.parent = stream
|
||||
|
||||
go dissect(extension, reader, options)
|
||||
return reader
|
||||
}
|
||||
|
||||
func dissect(extension *api.Extension, reader api.TcpReader,
|
||||
options *api.TrafficFilteringOptions) {
|
||||
func dissect(extension *api.Extension, reader *tlsReader, options *api.TrafficFilteringOptions) {
|
||||
b := bufio.NewReader(reader)
|
||||
|
||||
err := extension.Dissector.Dissect(b, reader, options)
|
||||
@@ -279,27 +280,3 @@ func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
srcIp, srcPort, dstIp, dstPort,
|
||||
chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
}
|
||||
|
||||
func (p *tlsPoller) SetProtocol(protocol *api.Protocol) {
|
||||
// TODO: Implement
|
||||
}
|
||||
|
||||
func (p *tlsPoller) GetOrigin() api.Capture {
|
||||
return p.origin
|
||||
}
|
||||
|
||||
func (p *tlsPoller) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||
return p.protoIdentifier
|
||||
}
|
||||
|
||||
func (p *tlsPoller) GetReqResMatcher() api.RequestResponseMatcher {
|
||||
return p.reqResMatcher
|
||||
}
|
||||
|
||||
func (p *tlsPoller) GetIsTapTarget() bool {
|
||||
return p.isTapTarget
|
||||
}
|
||||
|
||||
func (p *tlsPoller) GetIsClosed() bool {
|
||||
return p.isClosed
|
||||
}
|
||||
|
Reference in New Issue
Block a user