diff --git a/tap/api/tcp_stream.go b/tap/api/tcp_stream.go index 69f5184ed..16fb01fac 100644 --- a/tap/api/tcp_stream.go +++ b/tap/api/tcp_stream.go @@ -179,3 +179,26 @@ func (t *TcpStream) Close() { reader.Close() } } + +func (t *TcpStream) CloseOtherProtocolDissectors(protocol *Protocol) { + if t.SuperIdentifier.IsClosedOthers { + return + } + + t.SuperIdentifier.Protocol = protocol + + for i := range t.Clients { + reader := &t.Clients[i] + if reader.Extension.Protocol != t.SuperIdentifier.Protocol { + reader.Close() + } + } + for i := range t.Servers { + reader := &t.Servers[i] + if reader.Extension.Protocol != t.SuperIdentifier.Protocol { + reader.Close() + } + } + + t.SuperIdentifier.IsClosedOthers = true +} diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 5e3c76700..2fb3a3f17 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha switch lastMethodFrameMessage.(type) { case *BasicPublish: eventBasicPublish.Body = f.Body - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicDeliver: eventBasicDeliver.Body = f.Body - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) } @@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha NoWait: m.NoWait, Arguments: m.Arguments, } - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicConsume: @@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha NoWait: m.NoWait, Arguments: m.Arguments, } - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *BasicDeliver: @@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha NoWait: m.NoWait, Arguments: m.Arguments, } - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ExchangeDeclare: @@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha NoWait: m.NoWait, Arguments: m.Arguments, } - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ConnectionStart: @@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha Mechanisms: m.Mechanisms, Locales: m.Locales, } - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) case *ConnectionClose: @@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha ClassId: m.ClassId, MethodId: m.MethodId, } - reader.Parent.SuperIdentifier.Protocol = &protocol + reader.Parent.CloseOtherProtocolDissectors(&protocol) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 7d7f7a114..187735275 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -128,7 +128,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } else if err != nil { continue } - reader.Parent.SuperIdentifier.Protocol = &http11protocol + reader.Parent.CloseOtherProtocolDissectors(&http11protocol) } else if reader.IsClient { var req *http.Request switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher) @@ -137,7 +137,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } else if err != nil { continue } - reader.Parent.SuperIdentifier.Protocol = &http11protocol + reader.Parent.CloseOtherProtocolDissectors(&http11protocol) // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 if switchingProtocolsHTTP2 { @@ -169,7 +169,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha } else if err != nil { continue } - reader.Parent.SuperIdentifier.Protocol = &http11protocol + reader.Parent.CloseOtherProtocolDissectors(&http11protocol) } } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 7d6caf283..71d31680d 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -48,13 +48,13 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha if err != nil { return err } - reader.Parent.SuperIdentifier.Protocol = &_protocol + reader.Parent.CloseOtherProtocolDissectors(&_protocol) } else { err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher) if err != nil { return err } - reader.Parent.SuperIdentifier.Protocol = &_protocol + reader.Parent.CloseOtherProtocolDissectors(&_protocol) } } }