mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-28 13:55:47 +00:00
* Add a PCAP based testbed * Fix typos * Download PCAPs from the Google Cloud bucket * Add a Python script to automate the PCAP testbed * Dump the test suite into a file named `suite.json` * Serialize entries separately * Dissect individual TCP streams one by one through separate PCAP files * Improve the reliability a little bit * Ditch the individual TCP streams idea * Fix some issues in Kafka * Print the total number of packets and TCP streams * Fix an interface conversion error in AMQP * Print the total number of returning items from the dissectors * Print the total number of returning items from the dissectors really * Fix a possible race condition * Do atomic increments just to be sure * Print the total number of Redis `Dissect` calls * Improve the request-response matching in Redis by including the TCP stream ID * Update the request-response pair matching key format in HTTP and Kafka * Rearrange the test suite * Add more queries to the test suite * Remove the debug prints * Add the assertions * Close the WebSocket connection faster * Make `MIZU_TEST` enviroment variable a shared constant * Add `test-lint` rule * Fix several issues in Kafka * Update the test suite * Add more queries * Fix the `test-lint` rule * Exit only after PCAP EOF * Add more queries * Update `suite.json` * Make the tests more stable * Revert the bad changes * Run `go mod tidy` on `tap/`
84 lines
2.5 KiB
Go
84 lines
2.5 KiB
Go
package http
|
|
|
|
import (
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
var reqResMatcher = createResponseRequestMatcher() // global
|
|
|
|
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
|
|
type requestResponseMatcher struct {
|
|
openMessagesMap *sync.Map
|
|
}
|
|
|
|
func createResponseRequestMatcher() requestResponseMatcher {
|
|
newMatcher := &requestResponseMatcher{openMessagesMap: &sync.Map{}}
|
|
return *newMatcher
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
|
requestHTTPMessage := api.GenericMessage{
|
|
IsRequest: true,
|
|
CaptureTime: captureTime,
|
|
Payload: api.HTTPPayload{
|
|
Type: TypeHttpRequest,
|
|
Data: request,
|
|
},
|
|
}
|
|
|
|
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
|
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
|
responseHTTPMessage := response.(*api.GenericMessage)
|
|
if responseHTTPMessage.IsRequest {
|
|
return nil
|
|
}
|
|
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage, protoMinor)
|
|
}
|
|
|
|
matcher.openMessagesMap.Store(ident, &requestHTTPMessage)
|
|
return nil
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
|
responseHTTPMessage := api.GenericMessage{
|
|
IsRequest: false,
|
|
CaptureTime: captureTime,
|
|
Payload: api.HTTPPayload{
|
|
Type: TypeHttpResponse,
|
|
Data: response,
|
|
},
|
|
}
|
|
|
|
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
|
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
|
requestHTTPMessage := request.(*api.GenericMessage)
|
|
if !requestHTTPMessage.IsRequest {
|
|
return nil
|
|
}
|
|
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage, protoMinor)
|
|
}
|
|
|
|
matcher.openMessagesMap.Store(ident, &responseHTTPMessage)
|
|
return nil
|
|
}
|
|
|
|
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage, protoMinor int) *api.OutputChannelItem {
|
|
protocol := http11protocol
|
|
if protoMinor == 0 {
|
|
protocol = http10protocol
|
|
}
|
|
return &api.OutputChannelItem{
|
|
Protocol: protocol,
|
|
Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
|
|
ConnectionInfo: nil,
|
|
Pair: &api.RequestResponsePair{
|
|
Request: *requestHTTPMessage,
|
|
Response: *responseHTTPMessage,
|
|
},
|
|
}
|
|
}
|