mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-01 10:36:55 +00:00
* Implement the basis of AMQP request-response matching
* Fix `package.json`
* Add `ExchangeDeclareOk`
* Add `ConnectionCloseOk`
* Add `BasicConsumeOk`
* Add `QueueBindOk`
* Add `representEmptyResponse` and fix `BasicPublish` and `BasicDeliver`
* Fix ident and matcher, add `connectionOpen`, `channelOpen`, `connectionTune`, `basicCancel`
* Fix linter
* Fix the unit tests
* #run_acceptance_tests
* #run_acceptance_tests
* Fix the tests #run_acceptance_tests
* Log don't panic
* Don't skip AMQP acceptance tests #run_acceptance_tests
* Revert "Don't skip AMQP acceptance tests #run_acceptance_tests"
This reverts commit c60e9cf747
.
* Remove `Details` section from `representEmpty`
* Add `This request or response has no data.` text
114 lines
3.4 KiB
Go
114 lines
3.4 KiB
Go
package amqp
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{channel_id}_{class_id}_{method_id}
|
|
type requestResponseMatcher struct {
|
|
openMessagesMap *sync.Map
|
|
}
|
|
|
|
func createResponseRequestMatcher() api.RequestResponseMatcher {
|
|
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
|
|
return matcher.openMessagesMap
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) emitEvent(isRequest bool, ident string, method string, event interface{}, reader api.TcpReader) {
|
|
reader.GetParent().SetProtocol(&protocol)
|
|
|
|
var item *api.OutputChannelItem
|
|
if isRequest {
|
|
item = matcher.registerRequest(ident, method, event, reader.GetCaptureTime(), reader.GetReadProgress().Current())
|
|
} else {
|
|
item = matcher.registerResponse(ident, method, event, reader.GetCaptureTime(), reader.GetReadProgress().Current())
|
|
}
|
|
|
|
if item != nil {
|
|
item.ConnectionInfo = &api.ConnectionInfo{
|
|
ClientIP: reader.GetTcpID().SrcIP,
|
|
ClientPort: reader.GetTcpID().SrcPort,
|
|
ServerIP: reader.GetTcpID().DstIP,
|
|
ServerPort: reader.GetTcpID().DstPort,
|
|
IsOutgoing: true,
|
|
}
|
|
item.Capture = reader.GetParent().GetOrigin()
|
|
reader.GetEmitter().Emit(item)
|
|
}
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerRequest(ident string, method string, request interface{}, captureTime time.Time, captureSize int) *api.OutputChannelItem {
|
|
requestAMQPMessage := api.GenericMessage{
|
|
IsRequest: true,
|
|
CaptureTime: captureTime,
|
|
CaptureSize: captureSize,
|
|
Payload: AMQPPayload{
|
|
Data: &AMQPWrapper{
|
|
Method: method,
|
|
Url: "",
|
|
Details: request,
|
|
},
|
|
},
|
|
}
|
|
|
|
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
|
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
|
responseAMQPMessage := response.(*api.GenericMessage)
|
|
if responseAMQPMessage.IsRequest {
|
|
return nil
|
|
}
|
|
return matcher.preparePair(&requestAMQPMessage, responseAMQPMessage)
|
|
}
|
|
|
|
matcher.openMessagesMap.Store(ident, &requestAMQPMessage)
|
|
return nil
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerResponse(ident string, method string, response interface{}, captureTime time.Time, captureSize int) *api.OutputChannelItem {
|
|
responseAMQPMessage := api.GenericMessage{
|
|
IsRequest: false,
|
|
CaptureTime: captureTime,
|
|
CaptureSize: captureSize,
|
|
Payload: AMQPPayload{
|
|
Data: &AMQPWrapper{
|
|
Method: method,
|
|
Url: "",
|
|
Details: response,
|
|
},
|
|
},
|
|
}
|
|
|
|
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
|
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
|
requestAMQPMessage := request.(*api.GenericMessage)
|
|
if !requestAMQPMessage.IsRequest {
|
|
return nil
|
|
}
|
|
return matcher.preparePair(requestAMQPMessage, &responseAMQPMessage)
|
|
}
|
|
|
|
matcher.openMessagesMap.Store(ident, &responseAMQPMessage)
|
|
return nil
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) preparePair(requestAMQPMessage *api.GenericMessage, responseAMQPMessage *api.GenericMessage) *api.OutputChannelItem {
|
|
return &api.OutputChannelItem{
|
|
Protocol: protocol,
|
|
Timestamp: requestAMQPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
|
|
ConnectionInfo: nil,
|
|
Pair: &api.RequestResponsePair{
|
|
Request: *requestAMQPMessage,
|
|
Response: *responseAMQPMessage,
|
|
},
|
|
}
|
|
}
|