mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-13 22:27:12 +00:00
* Removed policy rules (validation rules) feature * updated test pcap * Remove rules * fix replay in rules Co-authored-by: Roy Island <roy@up9.com> Co-authored-by: RoyUP9 <87927115+RoyUP9@users.noreply.github.com> Co-authored-by: Roee Gadot <roee.gadot@up9.com>
185 lines
4.4 KiB
Go
185 lines
4.4 KiB
Go
package replay
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/up9inc/mizu/agent/pkg/app"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
|
|
)
|
|
|
|
var (
|
|
inProcessRequestsLocker = sync.Mutex{}
|
|
inProcessRequests = 0
|
|
)
|
|
|
|
const maxParallelAction = 5
|
|
|
|
type Details struct {
|
|
Method string `json:"method"`
|
|
Url string `json:"url"`
|
|
Body string `json:"body"`
|
|
Headers map[string]string `json:"headers"`
|
|
}
|
|
|
|
type Response struct {
|
|
Success bool `json:"status"`
|
|
Data interface{} `json:"data"`
|
|
ErrorMessage string `json:"errorMessage"`
|
|
}
|
|
|
|
func incrementCounter() bool {
|
|
result := false
|
|
inProcessRequestsLocker.Lock()
|
|
if inProcessRequests < maxParallelAction {
|
|
inProcessRequests++
|
|
result = true
|
|
}
|
|
inProcessRequestsLocker.Unlock()
|
|
return result
|
|
}
|
|
|
|
func decrementCounter() {
|
|
inProcessRequestsLocker.Lock()
|
|
inProcessRequests--
|
|
inProcessRequestsLocker.Unlock()
|
|
}
|
|
|
|
func getEntryFromRequestResponse(extension *tapApi.Extension, request *http.Request, response *http.Response) *tapApi.Entry {
|
|
captureTime := time.Now()
|
|
|
|
itemTmp := tapApi.OutputChannelItem{
|
|
Protocol: *extension.Protocol,
|
|
ConnectionInfo: &tapApi.ConnectionInfo{
|
|
ClientIP: "",
|
|
ClientPort: "1",
|
|
ServerIP: "",
|
|
ServerPort: "1",
|
|
IsOutgoing: false,
|
|
},
|
|
Capture: "",
|
|
Timestamp: time.Now().UnixMilli(),
|
|
Pair: &tapApi.RequestResponsePair{
|
|
Request: tapApi.GenericMessage{
|
|
IsRequest: true,
|
|
CaptureTime: captureTime,
|
|
CaptureSize: 0,
|
|
Payload: &mizuhttp.HTTPPayload{
|
|
Type: mizuhttp.TypeHttpRequest,
|
|
Data: request,
|
|
},
|
|
},
|
|
Response: tapApi.GenericMessage{
|
|
IsRequest: false,
|
|
CaptureTime: captureTime,
|
|
CaptureSize: 0,
|
|
Payload: &mizuhttp.HTTPPayload{
|
|
Type: mizuhttp.TypeHttpResponse,
|
|
Data: response,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Analyze is expecting an item that's marshalled and unmarshalled
|
|
itemMarshalled, err := json.Marshal(itemTmp)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
var finalItem *tapApi.OutputChannelItem
|
|
if err := json.Unmarshal(itemMarshalled, &finalItem); err != nil {
|
|
return nil
|
|
}
|
|
|
|
return extension.Dissector.Analyze(finalItem, "", "", "")
|
|
}
|
|
|
|
func ExecuteRequest(replayData *Details, timeout time.Duration) *Response {
|
|
if incrementCounter() {
|
|
defer decrementCounter()
|
|
|
|
client := &http.Client{
|
|
Timeout: timeout,
|
|
}
|
|
|
|
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
|
|
if err != nil {
|
|
return &Response{
|
|
Success: false,
|
|
Data: nil,
|
|
ErrorMessage: err.Error(),
|
|
}
|
|
}
|
|
|
|
for headerKey, headerValue := range replayData.Headers {
|
|
request.Header.Add(headerKey, headerValue)
|
|
}
|
|
request.Header.Add("x-mizu", uuid.New().String())
|
|
response, requestErr := client.Do(request)
|
|
|
|
if requestErr != nil {
|
|
return &Response{
|
|
Success: false,
|
|
Data: nil,
|
|
ErrorMessage: requestErr.Error(),
|
|
}
|
|
}
|
|
|
|
extension := app.ExtensionsMap["http"] // # TODO: maybe pass the extension to the function so it can be tested
|
|
entry := getEntryFromRequestResponse(extension, request, response)
|
|
base := extension.Dissector.Summarize(entry)
|
|
var representation []byte
|
|
|
|
// Represent is expecting an entry that's marshalled and unmarshalled
|
|
entryMarshalled, err := json.Marshal(entry)
|
|
if err != nil {
|
|
return &Response{
|
|
Success: false,
|
|
Data: nil,
|
|
ErrorMessage: err.Error(),
|
|
}
|
|
}
|
|
var entryUnmarshalled *tapApi.Entry
|
|
if err := json.Unmarshal(entryMarshalled, &entryUnmarshalled); err != nil {
|
|
return &Response{
|
|
Success: false,
|
|
Data: nil,
|
|
ErrorMessage: err.Error(),
|
|
}
|
|
}
|
|
|
|
representation, err = extension.Dissector.Represent(entryUnmarshalled.Request, entryUnmarshalled.Response)
|
|
if err != nil {
|
|
return &Response{
|
|
Success: false,
|
|
Data: nil,
|
|
ErrorMessage: err.Error(),
|
|
}
|
|
}
|
|
|
|
return &Response{
|
|
Success: true,
|
|
Data: &tapApi.EntryWrapper{
|
|
Protocol: *extension.Protocol,
|
|
Representation: string(representation),
|
|
Data: entryUnmarshalled,
|
|
Base: base,
|
|
},
|
|
ErrorMessage: "",
|
|
}
|
|
} else {
|
|
return &Response{
|
|
Success: false,
|
|
Data: nil,
|
|
ErrorMessage: fmt.Sprintf("reached threshold of %d requests", maxParallelAction),
|
|
}
|
|
}
|
|
}
|