mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-19 04:53:32 +00:00
Add reply endpoint for http (#1168)
This commit is contained in:
parent
2bfae1baae
commit
01af6aa19c
@ -131,6 +131,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin
|
||||
routes.MetadataRoutes(ginApp)
|
||||
routes.StatusRoutes(ginApp)
|
||||
routes.DbRoutes(ginApp)
|
||||
routes.ReplayRoutes(ginApp)
|
||||
|
||||
return ginApp
|
||||
}
|
||||
|
34
agent/pkg/controllers/replay_controller.go
Normal file
34
agent/pkg/controllers/replay_controller.go
Normal file
@ -0,0 +1,34 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/replay"
|
||||
"github.com/up9inc/mizu/agent/pkg/validation"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
replayTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func ReplayRequest(c *gin.Context) {
|
||||
logger.Log.Debug("Starting replay")
|
||||
replayDetails := &replay.Details{}
|
||||
if err := c.Bind(replayDetails); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Validating replay, %v", replayDetails)
|
||||
if err := validation.Validate(replayDetails); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Debug("Executing replay, %v", replayDetails)
|
||||
result := replay.ExecuteRequest(replayDetails, replayTimeout)
|
||||
c.JSON(http.StatusOK, result)
|
||||
}
|
@ -60,7 +60,7 @@ func GetTappedPodsStatus() []shared.TappedPodStatus {
|
||||
|
||||
func SetNodeToTappedPodMap(nodeToTappedPodsMap shared.NodeToPodsMap) {
|
||||
summary := nodeToTappedPodsMap.Summary()
|
||||
logger.Log.Infof("Setting node to tapped pods map to %v", summary)
|
||||
logger.Log.Debugf("Setting node to tapped pods map to %v", summary)
|
||||
|
||||
nodeHostToTappedPodsMap = nodeToTappedPodsMap
|
||||
}
|
||||
|
167
agent/pkg/replay/replay.go
Normal file
167
agent/pkg/replay/replay.go
Normal file
@ -0,0 +1,167 @@
|
||||
package replay
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/up9inc/mizu/agent/pkg/app"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
|
||||
)
|
||||
|
||||
var (
|
||||
inProcessRequestsLocker = sync.Mutex{}
|
||||
inProcessRequests = 0
|
||||
)
|
||||
|
||||
const maxParallelAction = 5
|
||||
|
||||
type Details struct {
|
||||
Method string `json:"method"`
|
||||
Url string `json:"url"`
|
||||
Body string `json:"body"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Success bool `json:"status"`
|
||||
Data interface{} `json:"data"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
}
|
||||
|
||||
func incrementCounter() bool {
|
||||
result := false
|
||||
inProcessRequestsLocker.Lock()
|
||||
if inProcessRequests < maxParallelAction {
|
||||
inProcessRequests++
|
||||
result = true
|
||||
}
|
||||
inProcessRequestsLocker.Unlock()
|
||||
return result
|
||||
}
|
||||
|
||||
func decrementCounter() {
|
||||
inProcessRequestsLocker.Lock()
|
||||
inProcessRequests--
|
||||
inProcessRequestsLocker.Unlock()
|
||||
}
|
||||
|
||||
func getEntryFromRequestResponse(extension *tapApi.Extension, request *http.Request, response *http.Response) *tapApi.Entry {
|
||||
captureTime := time.Now()
|
||||
|
||||
itemTmp := tapApi.OutputChannelItem{
|
||||
Protocol: *extension.Protocol,
|
||||
ConnectionInfo: &tapApi.ConnectionInfo{
|
||||
ClientIP: "",
|
||||
ClientPort: "1",
|
||||
ServerIP: "",
|
||||
ServerPort: "1",
|
||||
IsOutgoing: false,
|
||||
},
|
||||
Capture: "",
|
||||
Timestamp: time.Now().UnixMilli(),
|
||||
Pair: &tapApi.RequestResponsePair{
|
||||
Request: tapApi.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
CaptureSize: 0,
|
||||
Payload: &mizuhttp.HTTPPayload{
|
||||
Type: mizuhttp.TypeHttpRequest,
|
||||
Data: request,
|
||||
},
|
||||
},
|
||||
Response: tapApi.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
CaptureSize: 0,
|
||||
Payload: &mizuhttp.HTTPPayload{
|
||||
Type: mizuhttp.TypeHttpResponse,
|
||||
Data: response,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Analyze is expecting an item that's marshalled and unmarshalled
|
||||
itemMarshalled, err := json.Marshal(itemTmp)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
var finalItem *tapApi.OutputChannelItem
|
||||
if err := json.Unmarshal(itemMarshalled, &finalItem); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return extension.Dissector.Analyze(finalItem, "", "", "")
|
||||
}
|
||||
|
||||
func ExecuteRequest(replayData *Details, timeout time.Duration) *Response {
|
||||
if incrementCounter() {
|
||||
defer decrementCounter()
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
|
||||
if err != nil {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
for headerKey, headerValue := range replayData.Headers {
|
||||
request.Header.Add(headerKey, headerValue)
|
||||
}
|
||||
request.Header.Add("x-mizu", uuid.New().String())
|
||||
response, requestErr := client.Do(request)
|
||||
|
||||
if requestErr != nil {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: requestErr.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
extension := app.ExtensionsMap["http"] // # TODO: maybe pass the extension to the function so it can be tested
|
||||
entry := getEntryFromRequestResponse(extension, request, response)
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
var representation []byte
|
||||
representation, err = extension.Dissector.Represent(entry.Request, entry.Response)
|
||||
if err != nil {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return &Response{
|
||||
Success: true,
|
||||
Data: &tapApi.EntryWrapper{
|
||||
Protocol: *extension.Protocol,
|
||||
Representation: string(representation),
|
||||
Data: entry,
|
||||
Base: base,
|
||||
Rules: nil,
|
||||
IsRulesEnabled: false,
|
||||
},
|
||||
ErrorMessage: "",
|
||||
}
|
||||
} else {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: fmt.Sprintf("reached threshold of %d requests", maxParallelAction),
|
||||
}
|
||||
}
|
||||
}
|
108
agent/pkg/replay/replay_internal_test.go
Normal file
108
agent/pkg/replay/replay_internal_test.go
Normal file
@ -0,0 +1,108 @@
|
||||
package replay
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"github.com/google/uuid"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
|
||||
)
|
||||
|
||||
func TestValid(t *testing.T) {
|
||||
client := &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
tests := map[string]*Details{
|
||||
"40x": {
|
||||
Method: "GET",
|
||||
Url: "http://httpbin.org/status/404",
|
||||
Body: "",
|
||||
Headers: map[string]string{},
|
||||
},
|
||||
"20x": {
|
||||
Method: "GET",
|
||||
Url: "http://httpbin.org/status/200",
|
||||
Body: "",
|
||||
Headers: map[string]string{},
|
||||
},
|
||||
"50x": {
|
||||
Method: "GET",
|
||||
Url: "http://httpbin.org/status/500",
|
||||
Body: "",
|
||||
Headers: map[string]string{},
|
||||
},
|
||||
// TODO: this should be fixes, currently not working because of header name with ":"
|
||||
//":path-header": {
|
||||
// Method: "GET",
|
||||
// Url: "http://httpbin.org/get",
|
||||
// Body: "",
|
||||
// Headers: map[string]string{
|
||||
// ":path": "/get",
|
||||
// },
|
||||
// },
|
||||
}
|
||||
|
||||
for testCaseName, replayData := range tests {
|
||||
t.Run(fmt.Sprintf("%+v", testCaseName), func(t *testing.T) {
|
||||
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
|
||||
if err != nil {
|
||||
t.Errorf("Error executing request")
|
||||
}
|
||||
|
||||
for headerKey, headerValue := range replayData.Headers {
|
||||
request.Header.Add(headerKey, headerValue)
|
||||
}
|
||||
request.Header.Add("x-mizu", uuid.New().String())
|
||||
response, requestErr := client.Do(request)
|
||||
|
||||
if requestErr != nil {
|
||||
t.Errorf("failed: %v, ", requestErr)
|
||||
}
|
||||
|
||||
extensionHttp := &tapApi.Extension{}
|
||||
dissectorHttp := mizuhttp.NewDissector()
|
||||
dissectorHttp.Register(extensionHttp)
|
||||
extensionHttp.Dissector = dissectorHttp
|
||||
extension := extensionHttp
|
||||
|
||||
entry := getEntryFromRequestResponse(extension, request, response)
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
|
||||
// Represent is expecting an entry that's marshalled and unmarshalled
|
||||
entryMarshalled, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
t.Errorf("failed marshaling entry: %v, ", err)
|
||||
}
|
||||
var entryUnmarshalled *tapApi.Entry
|
||||
if err := json.Unmarshal(entryMarshalled, &entryUnmarshalled); err != nil {
|
||||
t.Errorf("failed unmarshaling entry: %v, ", err)
|
||||
}
|
||||
|
||||
var representation []byte
|
||||
representation, err = extension.Dissector.Represent(entryUnmarshalled.Request, entryUnmarshalled.Response)
|
||||
if err != nil {
|
||||
t.Errorf("failed: %v, ", err)
|
||||
}
|
||||
|
||||
result := &tapApi.EntryWrapper{
|
||||
Protocol: *extension.Protocol,
|
||||
Representation: string(representation),
|
||||
Data: entry,
|
||||
Base: base,
|
||||
Rules: nil,
|
||||
IsRulesEnabled: false,
|
||||
}
|
||||
t.Logf("%+v", result)
|
||||
//data, _ := json.MarshalIndent(result, "", " ")
|
||||
//t.Logf("%+v", string(data))
|
||||
})
|
||||
}
|
||||
}
|
13
agent/pkg/routes/replay_routes.go
Normal file
13
agent/pkg/routes/replay_routes.go
Normal file
@ -0,0 +1,13 @@
|
||||
package routes
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/controllers"
|
||||
)
|
||||
|
||||
// ReplayRoutes defines the group of replay routes.
|
||||
func ReplayRoutes(app *gin.Engine) {
|
||||
routeGroup := app.Group("/replay")
|
||||
|
||||
routeGroup.POST("/", controllers.ReplayRequest)
|
||||
}
|
@ -55,7 +55,6 @@ type WebSocketMessageMetadata struct {
|
||||
MessageType WebSocketMessageType `json:"messageType,omitempty"`
|
||||
}
|
||||
|
||||
|
||||
type WebSocketStatusMessage struct {
|
||||
*WebSocketMessageMetadata
|
||||
TappingStatus []TappedPodStatus `json:"tappingStatus"`
|
||||
|
Loading…
Reference in New Issue
Block a user