diff --git a/tap/api/api.go b/tap/api/api.go index 74929210b..fd2a7ce52 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -104,11 +104,6 @@ type OutputChannelItem struct { Namespace string } -type ProtoIdentifier struct { - Protocol *Protocol - IsClosedOthers bool -} - type ReadProgress struct { readBytes int lastCurrent int @@ -419,13 +414,12 @@ type TcpReader interface { GetCaptureTime() time.Time GetEmitter() Emitter GetIsClosed() bool - GetExtension() *Extension } type TcpStream interface { SetProtocol(protocol *Protocol) GetOrigin() Capture - GetProtoIdentifier() *ProtoIdentifier + GetProtocol() *Protocol GetReqResMatchers() []RequestResponseMatcher GetIsTapTarget() bool GetIsClosed() bool diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index a1c0dd9aa..c08264611 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -75,7 +75,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. var lastMethodFrameMessage Message for { - if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol { + if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &protocol { return errors.New("Identified by another protocol") } diff --git a/tap/extensions/amqp/tcp_reader_mock_test.go b/tap/extensions/amqp/tcp_reader_mock_test.go index dd37fc7a4..3081e449e 100644 --- a/tap/extensions/amqp/tcp_reader_mock_test.go +++ b/tap/extensions/amqp/tcp_reader_mock_test.go @@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter { func (reader *tcpReader) GetIsClosed() bool { return reader.isClosed } - -func (reader *tcpReader) GetExtension() *api.Extension { - return reader.extension -} diff --git a/tap/extensions/amqp/tcp_stream_mock_test.go b/tap/extensions/amqp/tcp_stream_mock_test.go index ae68e5982..29138a2e1 100644 --- a/tap/extensions/amqp/tcp_stream_mock_test.go +++ b/tap/extensions/amqp/tcp_stream_mock_test.go @@ -7,18 +7,17 @@ import ( ) type tcpStream struct { - isClosed bool - protoIdentifier *api.ProtoIdentifier - isTapTarget bool - origin api.Capture - reqResMatchers []api.RequestResponseMatcher + isClosed bool + protocol *api.Protocol + isTapTarget bool + origin api.Capture + reqResMatchers []api.RequestResponseMatcher sync.Mutex } func NewTcpStream(capture api.Capture) api.TcpStream { return &tcpStream{ - origin: capture, - protoIdentifier: &api.ProtoIdentifier{}, + origin: capture, } } @@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier { - return t.protoIdentifier +func (t *tcpStream) GetProtocol() *api.Protocol { + return t.protocol } func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 4b49e392b..7b439e021 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -116,7 +116,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. http2Assembler = createHTTP2Assembler(b) } - if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &http11protocol { + if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &http11protocol { return errors.New("Identified by another protocol") } @@ -172,7 +172,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. } } - if reader.GetParent().GetProtoIdentifier().Protocol == nil { + if reader.GetParent().GetProtocol() == nil { return err } diff --git a/tap/extensions/http/tcp_reader_mock_test.go b/tap/extensions/http/tcp_reader_mock_test.go index bad15a0fd..87baf4293 100644 --- a/tap/extensions/http/tcp_reader_mock_test.go +++ b/tap/extensions/http/tcp_reader_mock_test.go @@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter { func (reader *tcpReader) GetIsClosed() bool { return reader.isClosed } - -func (reader *tcpReader) GetExtension() *api.Extension { - return reader.extension -} diff --git a/tap/extensions/http/tcp_stream_mock_test.go b/tap/extensions/http/tcp_stream_mock_test.go index ca1b5ee8a..9d3342364 100644 --- a/tap/extensions/http/tcp_stream_mock_test.go +++ b/tap/extensions/http/tcp_stream_mock_test.go @@ -7,18 +7,17 @@ import ( ) type tcpStream struct { - isClosed bool - protoIdentifier *api.ProtoIdentifier - isTapTarget bool - origin api.Capture - reqResMatchers []api.RequestResponseMatcher + isClosed bool + protocol *api.Protocol + isTapTarget bool + origin api.Capture + reqResMatchers []api.RequestResponseMatcher sync.Mutex } func NewTcpStream(capture api.Capture) api.TcpStream { return &tcpStream{ - origin: capture, - protoIdentifier: &api.ProtoIdentifier{}, + origin: capture, } } @@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier { - return t.protoIdentifier +func (t *tcpStream) GetProtocol() *api.Protocol { + return t.protocol } func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 151e498d7..556abdb80 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -38,7 +38,7 @@ func (d dissecting) Ping() { func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error { reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher) for { - if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol { + if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &_protocol { return errors.New("Identified by another protocol") } diff --git a/tap/extensions/kafka/tcp_reader_mock_test.go b/tap/extensions/kafka/tcp_reader_mock_test.go index 0e9355b92..9bcc5619b 100644 --- a/tap/extensions/kafka/tcp_reader_mock_test.go +++ b/tap/extensions/kafka/tcp_reader_mock_test.go @@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter { func (reader *tcpReader) GetIsClosed() bool { return reader.isClosed } - -func (reader *tcpReader) GetExtension() *api.Extension { - return reader.extension -} diff --git a/tap/extensions/kafka/tcp_stream_mock_test.go b/tap/extensions/kafka/tcp_stream_mock_test.go index d53006e88..9a99d42b6 100644 --- a/tap/extensions/kafka/tcp_stream_mock_test.go +++ b/tap/extensions/kafka/tcp_stream_mock_test.go @@ -7,18 +7,17 @@ import ( ) type tcpStream struct { - isClosed bool - protoIdentifier *api.ProtoIdentifier - isTapTarget bool - origin api.Capture - reqResMatchers []api.RequestResponseMatcher + isClosed bool + protocol *api.Protocol + isTapTarget bool + origin api.Capture + reqResMatchers []api.RequestResponseMatcher sync.Mutex } func NewTcpStream(capture api.Capture) api.TcpStream { return &tcpStream{ - origin: capture, - protoIdentifier: &api.ProtoIdentifier{}, + origin: capture, } } @@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier { - return t.protoIdentifier +func (t *tcpStream) GetProtocol() *api.Protocol { + return t.protocol } func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { diff --git a/tap/extensions/redis/tcp_reader_mock_test.go b/tap/extensions/redis/tcp_reader_mock_test.go index 223d7fd9a..6b7f3618e 100644 --- a/tap/extensions/redis/tcp_reader_mock_test.go +++ b/tap/extensions/redis/tcp_reader_mock_test.go @@ -78,7 +78,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter { func (reader *tcpReader) GetIsClosed() bool { return reader.isClosed } - -func (reader *tcpReader) GetExtension() *api.Extension { - return reader.extension -} diff --git a/tap/extensions/redis/tcp_stream_mock_test.go b/tap/extensions/redis/tcp_stream_mock_test.go index 304c85da0..450fe7575 100644 --- a/tap/extensions/redis/tcp_stream_mock_test.go +++ b/tap/extensions/redis/tcp_stream_mock_test.go @@ -7,18 +7,17 @@ import ( ) type tcpStream struct { - isClosed bool - protoIdentifier *api.ProtoIdentifier - isTapTarget bool - origin api.Capture - reqResMatchers []api.RequestResponseMatcher + isClosed bool + protocol *api.Protocol + isTapTarget bool + origin api.Capture + reqResMatchers []api.RequestResponseMatcher sync.Mutex } func NewTcpStream(capture api.Capture) api.TcpStream { return &tcpStream{ - origin: capture, - protoIdentifier: &api.ProtoIdentifier{}, + origin: capture, } } @@ -28,8 +27,8 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier { - return t.protoIdentifier +func (t *tcpStream) GetProtocol() *api.Protocol { + return t.protocol } func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 5af4b828d..ba40cbf2f 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -3,11 +3,9 @@ package tap import ( "bufio" "io" - "io/ioutil" "sync" "time" - "github.com/up9inc/mizu/logger" "github.com/up9inc/mizu/tap/api" ) @@ -23,44 +21,44 @@ type tcpReader struct { isClient bool isOutgoing bool msgQueue chan api.TcpReaderDataMsg // Channel of captured reassembled tcp payload + buffer []byte + exhaustBuffer bool data []byte progress *api.ReadProgress captureTime time.Time parent *tcpStream packetsSeen uint - extension *api.Extension emitter api.Emitter counterPair *api.CounterPair reqResMatcher api.RequestResponseMatcher sync.Mutex } -func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader { +func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, emitter api.Emitter) *tcpReader { return &tcpReader{ - msgQueue: msgQueue, - progress: progress, - ident: ident, - tcpID: tcpId, - captureTime: captureTime, - parent: parent, - isClient: isClient, - isOutgoing: isOutgoing, - extension: extension, - emitter: emitter, - counterPair: counterPair, - reqResMatcher: reqResMatcher, + msgQueue: msgQueue, + progress: progress, + ident: ident, + tcpID: tcpId, + captureTime: captureTime, + parent: parent, + isClient: isClient, + isOutgoing: isOutgoing, + emitter: emitter, } } func (reader *tcpReader) run(options *api.TrafficFilteringOptions, wg *sync.WaitGroup) { defer wg.Done() - b := bufio.NewReader(reader) - err := reader.extension.Dissector.Dissect(b, reader, options) - if err != nil { - _, err = io.Copy(ioutil.Discard, reader) - if err != nil { - logger.Log.Errorf("%v", err) + for i, extension := range extensions { + reader.reqResMatcher = reader.parent.reqResMatchers[i] + reader.counterPair = reader.parent.counterPairs[i] + b := bufio.NewReader(reader) + extension.Dissector.Dissect(b, reader, options) + if reader.parent.protocol != nil { + break } + reader.exhaustBuffer = true } } @@ -81,7 +79,17 @@ func (reader *tcpReader) sendMsgIfNotClosed(msg api.TcpReaderDataMsg) { reader.Unlock() } +func (reader *tcpReader) isProtocolIdentified() bool { + return reader.parent.protocol != nil +} + func (reader *tcpReader) Read(p []byte) (int, error) { + if reader.exhaustBuffer { + l := copy(p, reader.buffer) + reader.exhaustBuffer = false + return l, nil + } + var msg api.TcpReaderDataMsg ok := true @@ -101,6 +109,9 @@ func (reader *tcpReader) Read(p []byte) (int, error) { } l := copy(p, reader.data) + if !reader.isProtocolIdentified() { + reader.buffer = append(reader.buffer, reader.data...) + } reader.data = reader.data[l:] reader.progress.Feed(l) @@ -142,7 +153,3 @@ func (reader *tcpReader) GetEmitter() api.Emitter { func (reader *tcpReader) GetIsClosed() bool { return reader.isClosed } - -func (reader *tcpReader) GetExtension() *api.Extension { - return reader.extension -} diff --git a/tap/tcp_reassembly_stream.go b/tap/tcp_reassembly_stream.go index 36edefc41..cee096396 100644 --- a/tap/tcp_reassembly_stream.go +++ b/tap/tcp_reassembly_stream.go @@ -6,7 +6,6 @@ import ( "github.com/google/gopacket" "github.com/google/gopacket/layers" // pulls in all layers decoders "github.com/google/gopacket/reassembly" - "github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/diagnose" ) @@ -22,10 +21,10 @@ type tcpReassemblyStream struct { fsmerr bool optchecker reassembly.TCPOptionCheck isDNS bool - tcpStream api.TcpStream + tcpStream *tcpStream } -func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream api.TcpStream) ReassemblyStream { +func NewTcpReassemblyStream(ident string, tcp *layers.TCP, fsmOptions reassembly.TCPSimpleFSMOptions, stream *tcpStream) ReassemblyStream { return &tcpReassemblyStream{ ident: ident, tcpState: reassembly.NewTCPSimpleFSM(fsmOptions), @@ -145,17 +144,10 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas // This channel is read by an tcpReader object diagnose.AppStats.IncReassembledTcpPayloadsCount() timestamp := ac.GetCaptureInfo().Timestamp - stream := t.tcpStream.(*tcpStream) if dir == reassembly.TCPDirClientToServer { - for i := range stream.getClients() { - reader := stream.getClient(i) - reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp)) - } + t.tcpStream.client.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp)) } else { - for i := range stream.getServers() { - reader := stream.getServer(i) - reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp)) - } + t.tcpStream.server.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp)) } } } @@ -163,7 +155,7 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas func (t *tcpReassemblyStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { if t.tcpStream.GetIsTapTarget() && !t.tcpStream.GetIsClosed() { - t.tcpStream.(*tcpStream).close() + t.tcpStream.close() } // do not remove the connection to allow last ACK return false diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 753d00b20..876f1f5b6 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -13,25 +13,25 @@ import ( * In our implementation, we pass information from ReassembledSG to the TcpReader through a shared channel. */ type tcpStream struct { - id int64 - isClosed bool - protoIdentifier *api.ProtoIdentifier - isTapTarget bool - clients []*tcpReader - servers []*tcpReader - origin api.Capture - reqResMatchers []api.RequestResponseMatcher - createdAt time.Time - streamsMap api.TcpStreamMap + id int64 + isClosed bool + protocol *api.Protocol + isTapTarget bool + client *tcpReader + server *tcpReader + origin api.Capture + counterPairs []*api.CounterPair + reqResMatchers []api.RequestResponseMatcher + createdAt time.Time + streamsMap api.TcpStreamMap sync.Mutex } func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) *tcpStream { return &tcpStream{ - isTapTarget: isTapTarget, - protoIdentifier: &api.ProtoIdentifier{}, - streamsMap: streamsMap, - origin: capture, + isTapTarget: isTapTarget, + streamsMap: streamsMap, + origin: capture, } } @@ -55,38 +55,12 @@ func (t *tcpStream) close() { t.streamsMap.Delete(t.id) - for i := range t.clients { - reader := t.clients[i] - reader.close() - } - for i := range t.servers { - reader := t.servers[i] - reader.close() - } + t.client.close() + t.server.close() } -func (t *tcpStream) addClient(reader *tcpReader) { - t.clients = append(t.clients, reader) -} - -func (t *tcpStream) addServer(reader *tcpReader) { - t.servers = append(t.servers, reader) -} - -func (t *tcpStream) getClients() []*tcpReader { - return t.clients -} - -func (t *tcpStream) getServers() []*tcpReader { - return t.servers -} - -func (t *tcpStream) getClient(index int) *tcpReader { - return t.clients[index] -} - -func (t *tcpStream) getServer(index int) *tcpReader { - return t.servers[index] +func (t *tcpStream) addCounterPair(counterPair *api.CounterPair) { + t.counterPairs = append(t.counterPairs, counterPair) } func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) { @@ -94,37 +68,17 @@ func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) { } func (t *tcpStream) SetProtocol(protocol *api.Protocol) { - t.Lock() - defer t.Unlock() - - if t.protoIdentifier.IsClosedOthers { - return - } - - t.protoIdentifier.Protocol = protocol - - for i := range t.clients { - reader := t.clients[i] - if reader.GetExtension().Protocol != t.protoIdentifier.Protocol { - reader.close() - } - } - for i := range t.servers { - reader := t.servers[i] - if reader.GetExtension().Protocol != t.protoIdentifier.Protocol { - reader.close() - } - } - - t.protoIdentifier.IsClosedOthers = true + t.protocol = protocol + t.client.buffer = []byte{} + t.server.buffer = []byte{} } func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier { - return t.protoIdentifier +func (t *tcpStream) GetProtocol() *api.Protocol { + return t.protocol } func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 7803d9ec2..6aa3ebe9f 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -62,62 +62,56 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream) if stream.GetIsTapTarget() { stream.setId(factory.streamsMap.NextId()) - for i, extension := range extensions { - reqResMatcher := extension.Dissector.NewResponseRequestMatcher() - stream.addReqResMatcher(reqResMatcher) + for _, extension := range extensions { counterPair := &api.CounterPair{ Request: 0, Response: 0, } - stream.addClient( - NewTcpReader( - make(chan api.TcpReaderDataMsg), - &api.ReadProgress{}, - fmt.Sprintf("%s %s", net, transport), - &api.TcpID{ - SrcIP: srcIp, - DstIP: dstIp, - SrcPort: srcPort, - DstPort: dstPort, - }, - time.Time{}, - stream, - true, - props.isOutgoing, - extension, - factory.emitter, - counterPair, - reqResMatcher, - ), - ) - stream.addServer( - NewTcpReader( - make(chan api.TcpReaderDataMsg), - &api.ReadProgress{}, - fmt.Sprintf("%s %s", net, transport), - &api.TcpID{ - SrcIP: net.Dst().String(), - DstIP: net.Src().String(), - SrcPort: transport.Dst().String(), - DstPort: transport.Src().String(), - }, - time.Time{}, - stream, - false, - props.isOutgoing, - extension, - factory.emitter, - counterPair, - reqResMatcher, - ), - ) + stream.addCounterPair(counterPair) - factory.streamsMap.Store(stream.getId(), stream) - - factory.wg.Add(2) - go stream.getClient(i).run(filteringOptions, &factory.wg) - go stream.getServer(i).run(filteringOptions, &factory.wg) + reqResMatcher := extension.Dissector.NewResponseRequestMatcher() + stream.addReqResMatcher(reqResMatcher) } + + stream.client = NewTcpReader( + make(chan api.TcpReaderDataMsg), + &api.ReadProgress{}, + fmt.Sprintf("%s %s", net, transport), + &api.TcpID{ + SrcIP: srcIp, + DstIP: dstIp, + SrcPort: srcPort, + DstPort: dstPort, + }, + time.Time{}, + stream, + true, + props.isOutgoing, + factory.emitter, + ) + + stream.server = NewTcpReader( + make(chan api.TcpReaderDataMsg), + &api.ReadProgress{}, + fmt.Sprintf("%s %s", net, transport), + &api.TcpID{ + SrcIP: net.Dst().String(), + DstIP: net.Src().String(), + SrcPort: transport.Dst().String(), + DstPort: transport.Src().String(), + }, + time.Time{}, + stream, + false, + props.isOutgoing, + factory.emitter, + ) + + factory.streamsMap.Store(stream.getId(), stream) + + factory.wg.Add(2) + go stream.client.run(filteringOptions, &factory.wg) + go stream.server.run(filteringOptions, &factory.wg) } return reassemblyStream } diff --git a/tap/tcp_streams_map.go b/tap/tcp_streams_map.go index 523a06f2a..05a157ce6 100644 --- a/tap/tcp_streams_map.go +++ b/tap/tcp_streams_map.go @@ -55,7 +55,7 @@ func (streamMap *tcpStreamMap) CloseTimedoutTcpStreamChannels() { return true } - if stream.protoIdentifier.Protocol == nil { + if stream.protocol == nil { if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) { stream.close() diagnose.AppStats.IncDroppedTcpStreams() diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index 4acc8ca0f..3f5b35834 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -171,8 +171,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k } stream := &tlsStream{ - reader: reader, - protoIdentifier: &api.ProtoIdentifier{}, + reader: reader, } streamsMap.Store(streamsMap.NextId(), stream) diff --git a/tap/tlstapper/tls_reader.go b/tap/tlstapper/tls_reader.go index fa1c91611..42f1c93b4 100644 --- a/tap/tlstapper/tls_reader.go +++ b/tap/tlstapper/tls_reader.go @@ -87,7 +87,3 @@ func (r *tlsReader) GetEmitter() api.Emitter { func (r *tlsReader) GetIsClosed() bool { return false } - -func (r *tlsReader) GetExtension() *api.Extension { - return r.extension -} diff --git a/tap/tlstapper/tls_stream.go b/tap/tlstapper/tls_stream.go index 09c447f13..4f9f02c15 100644 --- a/tap/tlstapper/tls_stream.go +++ b/tap/tlstapper/tls_stream.go @@ -3,20 +3,20 @@ package tlstapper import "github.com/up9inc/mizu/tap/api" type tlsStream struct { - reader *tlsReader - protoIdentifier *api.ProtoIdentifier + reader *tlsReader + protocol *api.Protocol } func (t *tlsStream) GetOrigin() api.Capture { return api.Ebpf } -func (t *tlsStream) GetProtoIdentifier() *api.ProtoIdentifier { - return t.protoIdentifier +func (t *tlsStream) GetProtocol() *api.Protocol { + return t.protocol } func (t *tlsStream) SetProtocol(protocol *api.Protocol) { - t.protoIdentifier.Protocol = protocol + t.protocol = protocol } func (t *tlsStream) GetReqResMatchers() []api.RequestResponseMatcher {