diff --git a/tap/api/api.go b/tap/api/api.go index 8e6f07df9..8103acd4a 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -431,7 +431,6 @@ type TcpReader interface { type TcpStream interface { SetProtocol(protocol *Protocol) GetOrigin() Capture - GetProtocol() *Protocol GetReqResMatchers() []RequestResponseMatcher GetIsTapTarget() bool GetIsClosed() bool diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 6d024287d..8d77d043e 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -3,7 +3,6 @@ package amqp import ( "bufio" "encoding/json" - "errors" "fmt" "io" "log" @@ -75,10 +74,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. var lastMethodFrameMessage Message for { - if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &protocol { - return errors.New("Identified by another protocol") - } - frame, err := r.ReadFrame() if err == io.EOF { // We must read until we see an EOF... very important! @@ -90,6 +85,8 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. // drop case *HeaderFrame: + reader.GetParent().SetProtocol(&protocol) + // start content state header = f remaining = int(header.Size) @@ -107,20 +104,22 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. } case *BodyFrame: + reader.GetParent().SetProtocol(&protocol) + // continue until terminated remaining -= len(f.Body) switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Body = f.Body - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) case *BasicDeliver: eventBasicDeliver.Body = f.Body - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) } case *MethodFrame: + reader.GetParent().SetProtocol(&protocol) + lastMethodFrameMessage = f.Method switch m := f.Method.(type) { case *BasicPublish: @@ -137,7 +136,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. NoWait: m.NoWait, Arguments: m.Arguments, } - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) case *BasicConsume: @@ -150,7 +148,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. NoWait: m.NoWait, Arguments: m.Arguments, } - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) case *BasicDeliver: @@ -170,7 +167,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. NoWait: m.NoWait, Arguments: m.Arguments, } - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) case *ExchangeDeclare: @@ -184,7 +180,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. NoWait: m.NoWait, Arguments: m.Arguments, } - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) case *ConnectionStart: @@ -195,7 +190,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. Mechanisms: m.Mechanisms, Locales: m.Locales, } - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) case *ConnectionClose: @@ -205,7 +199,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. ClassId: m.ClassId, MethodId: m.MethodId, } - reader.GetParent().SetProtocol(&protocol) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin()) } diff --git a/tap/extensions/amqp/tcp_stream_mock_test.go b/tap/extensions/amqp/tcp_stream_mock_test.go index 29138a2e1..006e0d6e5 100644 --- a/tap/extensions/amqp/tcp_stream_mock_test.go +++ b/tap/extensions/amqp/tcp_stream_mock_test.go @@ -8,7 +8,6 @@ import ( type tcpStream struct { isClosed bool - protocol *api.Protocol isTapTarget bool origin api.Capture reqResMatchers []api.RequestResponseMatcher @@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtocol() *api.Protocol { - return t.protocol -} - func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { return t.reqResMatchers } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 19f26c122..6bc18086b 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -3,7 +3,6 @@ package http import ( "bufio" "encoding/json" - "errors" "fmt" "io" "log" @@ -144,10 +143,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. http2Assembler = createHTTP2Assembler(b) } - if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &http11protocol { - return errors.New("Identified by another protocol") - } - if isHTTP2 { err = handleHTTP2Stream(http2Assembler, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { @@ -200,10 +195,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api. } } - if reader.GetParent().GetProtocol() == nil { - return err - } - return nil } diff --git a/tap/extensions/http/tcp_stream_mock_test.go b/tap/extensions/http/tcp_stream_mock_test.go index 9d3342364..e644fad06 100644 --- a/tap/extensions/http/tcp_stream_mock_test.go +++ b/tap/extensions/http/tcp_stream_mock_test.go @@ -8,7 +8,6 @@ import ( type tcpStream struct { isClosed bool - protocol *api.Protocol isTapTarget bool origin api.Capture reqResMatchers []api.RequestResponseMatcher @@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtocol() *api.Protocol { - return t.protocol -} - func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { return t.reqResMatchers } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 750ca65ed..f0097d229 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -3,7 +3,6 @@ package kafka import ( "bufio" "encoding/json" - "errors" "fmt" "log" "time" @@ -38,10 +37,6 @@ func (d dissecting) Ping() { func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error { reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher) for { - if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &_protocol { - return errors.New("Identified by another protocol") - } - if reader.GetIsClient() { _, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher) if err != nil { diff --git a/tap/extensions/kafka/tcp_stream_mock_test.go b/tap/extensions/kafka/tcp_stream_mock_test.go index 9a99d42b6..8d6337327 100644 --- a/tap/extensions/kafka/tcp_stream_mock_test.go +++ b/tap/extensions/kafka/tcp_stream_mock_test.go @@ -8,7 +8,6 @@ import ( type tcpStream struct { isClosed bool - protocol *api.Protocol isTapTarget bool origin api.Capture reqResMatchers []api.RequestResponseMatcher @@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtocol() *api.Protocol { - return t.protocol -} - func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { return t.reqResMatchers } diff --git a/tap/extensions/redis/tcp_stream_mock_test.go b/tap/extensions/redis/tcp_stream_mock_test.go index 450fe7575..e15656b7f 100644 --- a/tap/extensions/redis/tcp_stream_mock_test.go +++ b/tap/extensions/redis/tcp_stream_mock_test.go @@ -8,7 +8,6 @@ import ( type tcpStream struct { isClosed bool - protocol *api.Protocol isTapTarget bool origin api.Capture reqResMatchers []api.RequestResponseMatcher @@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtocol() *api.Protocol { - return t.protocol -} - func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { return t.reqResMatchers } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 0d0532392..c56515b9c 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -83,10 +83,6 @@ func (t *tcpStream) GetOrigin() api.Capture { return t.origin } -func (t *tcpStream) GetProtocol() *api.Protocol { - return t.protocol -} - func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher { return t.reqResMatchers } diff --git a/tap/tlstapper/tls_stream.go b/tap/tlstapper/tls_stream.go index 4f9f02c15..d0077ba59 100644 --- a/tap/tlstapper/tls_stream.go +++ b/tap/tlstapper/tls_stream.go @@ -11,10 +11,6 @@ func (t *tlsStream) GetOrigin() api.Capture { return api.Ebpf } -func (t *tlsStream) GetProtocol() *api.Protocol { - return t.protocol -} - func (t *tlsStream) SetProtocol(protocol *api.Protocol) { t.protocol = protocol }