diff --git a/agent/main.go b/agent/main.go index 804f53f9f..6e9f9a9d9 100644 --- a/agent/main.go +++ b/agent/main.go @@ -131,6 +131,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin routes.MetadataRoutes(ginApp) routes.StatusRoutes(ginApp) routes.DbRoutes(ginApp) + routes.ReplayRoutes(ginApp) return ginApp } @@ -155,7 +156,7 @@ func runInTapperMode() { hostMode := os.Getenv(shared.HostModeEnvVar) == "1" tapOpts := &tap.TapOpts{ - HostMode: hostMode, + HostMode: hostMode, } filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) diff --git a/agent/pkg/controllers/replay_controller.go b/agent/pkg/controllers/replay_controller.go new file mode 100644 index 000000000..7bbfdbf50 --- /dev/null +++ b/agent/pkg/controllers/replay_controller.go @@ -0,0 +1,34 @@ +package controllers + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/up9inc/mizu/agent/pkg/replay" + "github.com/up9inc/mizu/agent/pkg/validation" + "github.com/up9inc/mizu/logger" +) + +const ( + replayTimeout = 10 * time.Second +) + +func ReplayRequest(c *gin.Context) { + logger.Log.Debug("Starting replay") + replayDetails := &replay.Details{} + if err := c.Bind(replayDetails); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + + logger.Log.Debugf("Validating replay, %v", replayDetails) + if err := validation.Validate(replayDetails); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + + logger.Log.Debug("Executing replay, %v", replayDetails) + result := replay.ExecuteRequest(replayDetails, replayTimeout) + c.JSON(http.StatusOK, result) +} diff --git a/agent/pkg/providers/tappedPods/tapped_pods_provider.go b/agent/pkg/providers/tappedPods/tapped_pods_provider.go index e0ffc0936..db52a718b 100644 --- a/agent/pkg/providers/tappedPods/tapped_pods_provider.go +++ b/agent/pkg/providers/tappedPods/tapped_pods_provider.go @@ -60,7 +60,7 @@ func GetTappedPodsStatus() []shared.TappedPodStatus { func SetNodeToTappedPodMap(nodeToTappedPodsMap shared.NodeToPodsMap) { summary := nodeToTappedPodsMap.Summary() - logger.Log.Infof("Setting node to tapped pods map to %v", summary) + logger.Log.Debugf("Setting node to tapped pods map to %v", summary) nodeHostToTappedPodsMap = nodeToTappedPodsMap } diff --git a/agent/pkg/replay/replay.go b/agent/pkg/replay/replay.go new file mode 100644 index 000000000..9b5ff82b0 --- /dev/null +++ b/agent/pkg/replay/replay.go @@ -0,0 +1,167 @@ +package replay + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/up9inc/mizu/agent/pkg/app" + tapApi "github.com/up9inc/mizu/tap/api" + mizuhttp "github.com/up9inc/mizu/tap/extensions/http" +) + +var ( + inProcessRequestsLocker = sync.Mutex{} + inProcessRequests = 0 +) + +const maxParallelAction = 5 + +type Details struct { + Method string `json:"method"` + Url string `json:"url"` + Body string `json:"body"` + Headers map[string]string `json:"headers"` +} + +type Response struct { + Success bool `json:"status"` + Data interface{} `json:"data"` + ErrorMessage string `json:"errorMessage"` +} + +func incrementCounter() bool { + result := false + inProcessRequestsLocker.Lock() + if inProcessRequests < maxParallelAction { + inProcessRequests++ + result = true + } + inProcessRequestsLocker.Unlock() + return result +} + +func decrementCounter() { + inProcessRequestsLocker.Lock() + inProcessRequests-- + inProcessRequestsLocker.Unlock() +} + +func getEntryFromRequestResponse(extension *tapApi.Extension, request *http.Request, response *http.Response) *tapApi.Entry { + captureTime := time.Now() + + itemTmp := tapApi.OutputChannelItem{ + Protocol: *extension.Protocol, + ConnectionInfo: &tapApi.ConnectionInfo{ + ClientIP: "", + ClientPort: "1", + ServerIP: "", + ServerPort: "1", + IsOutgoing: false, + }, + Capture: "", + Timestamp: time.Now().UnixMilli(), + Pair: &tapApi.RequestResponsePair{ + Request: tapApi.GenericMessage{ + IsRequest: true, + CaptureTime: captureTime, + CaptureSize: 0, + Payload: &mizuhttp.HTTPPayload{ + Type: mizuhttp.TypeHttpRequest, + Data: request, + }, + }, + Response: tapApi.GenericMessage{ + IsRequest: false, + CaptureTime: captureTime, + CaptureSize: 0, + Payload: &mizuhttp.HTTPPayload{ + Type: mizuhttp.TypeHttpResponse, + Data: response, + }, + }, + }, + } + + // Analyze is expecting an item that's marshalled and unmarshalled + itemMarshalled, err := json.Marshal(itemTmp) + if err != nil { + return nil + } + var finalItem *tapApi.OutputChannelItem + if err := json.Unmarshal(itemMarshalled, &finalItem); err != nil { + return nil + } + + return extension.Dissector.Analyze(finalItem, "", "", "") +} + +func ExecuteRequest(replayData *Details, timeout time.Duration) *Response { + if incrementCounter() { + defer decrementCounter() + + client := &http.Client{ + Timeout: timeout, + } + + request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body)) + if err != nil { + return &Response{ + Success: false, + Data: nil, + ErrorMessage: err.Error(), + } + } + + for headerKey, headerValue := range replayData.Headers { + request.Header.Add(headerKey, headerValue) + } + request.Header.Add("x-mizu", uuid.New().String()) + response, requestErr := client.Do(request) + + if requestErr != nil { + return &Response{ + Success: false, + Data: nil, + ErrorMessage: requestErr.Error(), + } + } + + extension := app.ExtensionsMap["http"] // # TODO: maybe pass the extension to the function so it can be tested + entry := getEntryFromRequestResponse(extension, request, response) + base := extension.Dissector.Summarize(entry) + var representation []byte + representation, err = extension.Dissector.Represent(entry.Request, entry.Response) + if err != nil { + return &Response{ + Success: false, + Data: nil, + ErrorMessage: err.Error(), + } + } + + return &Response{ + Success: true, + Data: &tapApi.EntryWrapper{ + Protocol: *extension.Protocol, + Representation: string(representation), + Data: entry, + Base: base, + Rules: nil, + IsRulesEnabled: false, + }, + ErrorMessage: "", + } + } else { + return &Response{ + Success: false, + Data: nil, + ErrorMessage: fmt.Sprintf("reached threshold of %d requests", maxParallelAction), + } + } +} diff --git a/agent/pkg/replay/replay_internal_test.go b/agent/pkg/replay/replay_internal_test.go new file mode 100644 index 000000000..4fa23ac8e --- /dev/null +++ b/agent/pkg/replay/replay_internal_test.go @@ -0,0 +1,108 @@ +package replay + +import ( + "bytes" + "fmt" + "net/http" + "strings" + "testing" + "time" + + "encoding/json" + + "github.com/google/uuid" + tapApi "github.com/up9inc/mizu/tap/api" + mizuhttp "github.com/up9inc/mizu/tap/extensions/http" +) + +func TestValid(t *testing.T) { + client := &http.Client{ + Timeout: 10 * time.Second, + } + + tests := map[string]*Details{ + "40x": { + Method: "GET", + Url: "http://httpbin.org/status/404", + Body: "", + Headers: map[string]string{}, + }, + "20x": { + Method: "GET", + Url: "http://httpbin.org/status/200", + Body: "", + Headers: map[string]string{}, + }, + "50x": { + Method: "GET", + Url: "http://httpbin.org/status/500", + Body: "", + Headers: map[string]string{}, + }, + // TODO: this should be fixes, currently not working because of header name with ":" + //":path-header": { + // Method: "GET", + // Url: "http://httpbin.org/get", + // Body: "", + // Headers: map[string]string{ + // ":path": "/get", + // }, + // }, + } + + for testCaseName, replayData := range tests { + t.Run(fmt.Sprintf("%+v", testCaseName), func(t *testing.T) { + request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body)) + if err != nil { + t.Errorf("Error executing request") + } + + for headerKey, headerValue := range replayData.Headers { + request.Header.Add(headerKey, headerValue) + } + request.Header.Add("x-mizu", uuid.New().String()) + response, requestErr := client.Do(request) + + if requestErr != nil { + t.Errorf("failed: %v, ", requestErr) + } + + extensionHttp := &tapApi.Extension{} + dissectorHttp := mizuhttp.NewDissector() + dissectorHttp.Register(extensionHttp) + extensionHttp.Dissector = dissectorHttp + extension := extensionHttp + + entry := getEntryFromRequestResponse(extension, request, response) + base := extension.Dissector.Summarize(entry) + + // Represent is expecting an entry that's marshalled and unmarshalled + entryMarshalled, err := json.Marshal(entry) + if err != nil { + t.Errorf("failed marshaling entry: %v, ", err) + } + var entryUnmarshalled *tapApi.Entry + if err := json.Unmarshal(entryMarshalled, &entryUnmarshalled); err != nil { + t.Errorf("failed unmarshaling entry: %v, ", err) + } + + var representation []byte + representation, err = extension.Dissector.Represent(entryUnmarshalled.Request, entryUnmarshalled.Response) + if err != nil { + t.Errorf("failed: %v, ", err) + } + + result := &tapApi.EntryWrapper{ + Protocol: *extension.Protocol, + Representation: string(representation), + Data: entry, + Base: base, + Rules: nil, + IsRulesEnabled: false, + } + t.Logf("%+v", result) + //data, _ := json.MarshalIndent(result, "", " ") + //t.Logf("%+v", string(data)) + }) + } +} diff --git a/agent/pkg/routes/replay_routes.go b/agent/pkg/routes/replay_routes.go new file mode 100644 index 000000000..d585e8232 --- /dev/null +++ b/agent/pkg/routes/replay_routes.go @@ -0,0 +1,13 @@ +package routes + +import ( + "github.com/gin-gonic/gin" + "github.com/up9inc/mizu/agent/pkg/controllers" +) + +// ReplayRoutes defines the group of replay routes. +func ReplayRoutes(app *gin.Engine) { + routeGroup := app.Group("/replay") + + routeGroup.POST("/", controllers.ReplayRequest) +} diff --git a/shared/models.go b/shared/models.go index 6991d5cd2..00170409f 100644 --- a/shared/models.go +++ b/shared/models.go @@ -55,7 +55,6 @@ type WebSocketMessageMetadata struct { MessageType WebSocketMessageType `json:"messageType,omitempty"` } - type WebSocketStatusMessage struct { *WebSocketMessageMetadata TappingStatus []TappedPodStatus `json:"tappingStatus"`