Add CloseOtherProtocolDissectors method and use it to synchronously close the other protocol dissectors

This commit is contained in:
M. Mert Yildiran 2022-04-20 13:17:24 +03:00
parent ea85b0b082
commit 0113367984
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
4 changed files with 36 additions and 13 deletions

View File

@ -179,3 +179,26 @@ func (t *TcpStream) Close() {
reader.Close() reader.Close()
} }
} }
func (t *TcpStream) CloseOtherProtocolDissectors(protocol *Protocol) {
if t.SuperIdentifier.IsClosedOthers {
return
}
t.SuperIdentifier.Protocol = protocol
for i := range t.Clients {
reader := &t.Clients[i]
if reader.Extension.Protocol != t.SuperIdentifier.Protocol {
reader.Close()
}
}
for i := range t.Servers {
reader := &t.Servers[i]
if reader.Extension.Protocol != t.SuperIdentifier.Protocol {
reader.Close()
}
}
t.SuperIdentifier.IsClosedOthers = true
}

View File

@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
switch lastMethodFrameMessage.(type) { switch lastMethodFrameMessage.(type) {
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Body = f.Body eventBasicPublish.Body = f.Body
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
} }
@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *BasicConsume: case *BasicConsume:
@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *BasicDeliver: case *BasicDeliver:
@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *ExchangeDeclare: case *ExchangeDeclare:
@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *ConnectionStart: case *ConnectionStart:
@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
Mechanisms: m.Mechanisms, Mechanisms: m.Mechanisms,
Locales: m.Locales, Locales: m.Locales,
} }
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
case *ConnectionClose: case *ConnectionClose:
@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
ClassId: m.ClassId, ClassId: m.ClassId,
MethodId: m.MethodId, MethodId: m.MethodId,
} }
reader.Parent.SuperIdentifier.Protocol = &protocol reader.Parent.CloseOtherProtocolDissectors(&protocol)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.SuperTimer.CaptureTime, reader.Progress.Current(), reader.Emitter, reader.Parent.Origin)
} }

View File

@ -128,7 +128,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
} else if err != nil { } else if err != nil {
continue continue
} }
reader.Parent.SuperIdentifier.Protocol = &http11protocol reader.Parent.CloseOtherProtocolDissectors(&http11protocol)
} else if reader.IsClient { } else if reader.IsClient {
var req *http.Request var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher) switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.Progress, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, options, reqResMatcher)
@ -137,7 +137,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
} else if err != nil { } else if err != nil {
continue continue
} }
reader.Parent.SuperIdentifier.Protocol = &http11protocol reader.Parent.CloseOtherProtocolDissectors(&http11protocol)
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 { if switchingProtocolsHTTP2 {
@ -169,7 +169,7 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
} else if err != nil { } else if err != nil {
continue continue
} }
reader.Parent.SuperIdentifier.Protocol = &http11protocol reader.Parent.CloseOtherProtocolDissectors(&http11protocol)
} }
} }

View File

@ -48,13 +48,13 @@ func (d dissecting) Dissect(b *bufio.Reader, reader *api.TcpReader, options *sha
if err != nil { if err != nil {
return err return err
} }
reader.Parent.SuperIdentifier.Protocol = &_protocol reader.Parent.CloseOtherProtocolDissectors(&_protocol)
} else { } else {
err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher) err := ReadResponse(b, reader.Parent.Origin, reader.TcpID, reader.CounterPair, reader.SuperTimer, reader.Emitter, reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
reader.Parent.SuperIdentifier.Protocol = &_protocol reader.Parent.CloseOtherProtocolDissectors(&_protocol)
} }
} }
} }