diff --git a/acceptanceTests/tap_test.go b/acceptanceTests/tap_test.go index 3c359a0d0..1e6ca9a4f 100644 --- a/acceptanceTests/tap_test.go +++ b/acceptanceTests/tap_test.go @@ -13,22 +13,22 @@ func TestTapAndFetch(t *testing.T) { t.Skip("ignored acceptance test") } - tests := []int{1, 100} + tests := []int{50} for _, entriesCount := range tests { t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) { - cliPath, cliPathErr := GetCliPath() + cliPath, cliPathErr := getCliPath() if cliPathErr != nil { t.Errorf("failed to get cli path, err: %v", cliPathErr) return } - tapCmdArgs := GetDefaultTapCommandArgs() + tapCmdArgs := getDefaultTapCommandArgs() tapCmd := exec.Command(cliPath, tapCmdArgs...) t.Logf("running command: %v", tapCmd.String()) t.Cleanup(func() { - if err := CleanupCommand(tapCmd); err != nil { + if err := cleanupCommand(tapCmd); err != nil { t.Logf("failed to cleanup tap command, err: %v", err) } }) @@ -38,86 +38,87 @@ func TestTapAndFetch(t *testing.T) { return } - time.Sleep(30 * time.Second) + if err := waitTapPodsReady(); err != nil { + t.Errorf("failed to start tap pods on time, err: %v", err) + return + } proxyUrl := "http://localhost:8080/api/v1/namespaces/mizu-tests/services/httpbin/proxy/get" for i := 0; i < entriesCount; i++ { - if _, requestErr := ExecuteHttpRequest(proxyUrl); requestErr != nil { + if _, requestErr := executeHttpRequest(proxyUrl); requestErr != nil { t.Errorf("failed to send proxy request, err: %v", requestErr) return } } - time.Sleep(5 * time.Second) - timestamp := time.Now().UnixNano() / int64(time.Millisecond) + entriesCheckFunc := func() error { + timestamp := time.Now().UnixNano() / int64(time.Millisecond) - entriesUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries?limit=%v&operator=lt×tamp=%v", entriesCount, timestamp) - requestResult, requestErr := ExecuteHttpRequest(entriesUrl) - if requestErr != nil { - t.Errorf("failed to get entries, err: %v", requestErr) + entriesUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries?limit=%v&operator=lt×tamp=%v", entriesCount, timestamp) + requestResult, requestErr := executeHttpRequest(entriesUrl) + if requestErr != nil { + return fmt.Errorf("failed to get entries, err: %v", requestErr) + } + + entries, ok := requestResult.([]interface{}) + if !ok { + return fmt.Errorf("invalid entries type") + } + + if len(entries) == 0 { + return fmt.Errorf("unexpected entries result - Expected more than 0 entries") + } + + entry, ok := entries[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid entry type") + } + + entryUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries/%v", entry["id"]) + requestResult, requestErr = executeHttpRequest(entryUrl) + if requestErr != nil { + return fmt.Errorf("failed to get entry, err: %v", requestErr) + } + + if requestResult == nil { + return fmt.Errorf("unexpected nil entry result") + } + + return nil + } + if err := retriesExecute(ShortRetriesCount, entriesCheckFunc); err != nil { + t.Errorf("%v", err) return } - entries, ok := requestResult.([]interface{}) - if !ok { - t.Errorf("invalid entries type") - return - } - - if len(entries) != entriesCount { - t.Errorf("unexpected entries result - Expected: %v, actual: %v", entriesCount, len(entries)) - return - } - - entry, ok := entries[0].(map[string]interface{}) - if !ok { - t.Errorf("invalid entry type") - return - } - - entryUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries/%v", entry["id"]) - requestResult, requestErr = ExecuteHttpRequest(entryUrl) - if requestErr != nil { - t.Errorf("failed to get entry, err: %v", requestErr) - return - } - - if requestResult == nil { - t.Errorf("unexpected nil entry result") - return - } - - fetchCmdArgs := GetDefaultFetchCommandArgs() + fetchCmdArgs := getDefaultFetchCommandArgs() fetchCmd := exec.Command(cliPath, fetchCmdArgs...) t.Logf("running command: %v", fetchCmd.String()) - t.Cleanup(func() { - if err := CleanupCommand(fetchCmd); err != nil { - t.Logf("failed to cleanup fetch command, err: %v", err) - } - }) - if err := fetchCmd.Start(); err != nil { t.Errorf("failed to start fetch command, err: %v", err) return } - time.Sleep(5 * time.Second) + harCheckFunc := func() error { + harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har") + if readFileErr != nil { + return fmt.Errorf("failed to read har file, err: %v", readFileErr) + } - harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har") - if readFileErr != nil { - t.Errorf("failed to read har file, err: %v", readFileErr) - return + harEntries, err := getEntriesFromHarBytes(harBytes) + if err != nil { + return fmt.Errorf("failed to get entries from har, err: %v", err) + } + + if len(harEntries) == 0 { + return fmt.Errorf("unexpected har entries result - Expected more than 0 entries") + } + + return nil } - - harEntries, err := GetEntriesFromHarBytes(harBytes) - if err != nil { - t.Errorf("failed to get entries from har, err: %v", err) - return - } - - if len(harEntries) != entriesCount { - t.Errorf("unexpected har entries result - Expected: %v, actual: %v", entriesCount, len(harEntries)) + if err := retriesExecute(ShortRetriesCount, harCheckFunc); err != nil { + t.Errorf("%v", err) return } }) diff --git a/acceptanceTests/testsUtils.go b/acceptanceTests/testsUtils.go index 123151ac3..edb2e8556 100644 --- a/acceptanceTests/testsUtils.go +++ b/acceptanceTests/testsUtils.go @@ -10,9 +10,15 @@ import ( "os/exec" "path" "syscall" + "time" ) -func GetCliPath() (string, error) { +const ( + LongRetriesCount = 100 + ShortRetriesCount = 10 +) + +func getCliPath() (string, error) { dir, filePathErr := os.Getwd() if filePathErr != nil { return "", filePathErr @@ -22,34 +28,74 @@ func GetCliPath() (string, error) { return cliPath, nil } -func GetDefaultCommandArgs() []string { +func getDefaultCommandArgs() []string { setFlag := "--set" telemetry := "telemetry=false" return []string{setFlag, telemetry} } -func GetDefaultTapCommandArgs() []string { +func getDefaultTapCommandArgs() []string { tapCommand := "tap" setFlag := "--set" namespaces := "tap.namespaces=mizu-tests" agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0" imagePullPolicy := "image-pull-policy=Never" - defaultCmdArgs := GetDefaultCommandArgs() + defaultCmdArgs := getDefaultCommandArgs() return append([]string{tapCommand, setFlag, namespaces, setFlag, agentImage, setFlag, imagePullPolicy}, defaultCmdArgs...) } -func GetDefaultFetchCommandArgs() []string { +func getDefaultFetchCommandArgs() []string { tapCommand := "fetch" - defaultCmdArgs := GetDefaultCommandArgs() + defaultCmdArgs := getDefaultCommandArgs() return append([]string{tapCommand}, defaultCmdArgs...) } -func JsonBytesToInterface(jsonBytes []byte) (interface{}, error) { +func retriesExecute(retriesCount int, executeFunc func() error) error { + var lastError error + + for i := 0; i < retriesCount; i++ { + if err := executeFunc(); err != nil { + lastError = err + + time.Sleep(1 * time.Second) + continue + } + + return nil + } + + return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError) +} + +func waitTapPodsReady() error { + resolvingUrl := fmt.Sprintf("http://localhost:8899/mizu/status/tappersCount") + tapPodsReadyFunc := func() error { + requestResult, requestErr := executeHttpRequest(resolvingUrl) + if requestErr != nil { + return requestErr + } + + tappersCount, ok := requestResult.(float64) + if !ok { + return fmt.Errorf("invalid tappers count type") + } + + if tappersCount == 0 { + return fmt.Errorf("no tappers running") + } + + return nil + } + + return retriesExecute(LongRetriesCount, tapPodsReadyFunc) +} + +func jsonBytesToInterface(jsonBytes []byte) (interface{}, error) { var result interface{} if parseErr := json.Unmarshal(jsonBytes, &result); parseErr != nil { return nil, parseErr @@ -58,7 +104,7 @@ func JsonBytesToInterface(jsonBytes []byte) (interface{}, error) { return result, nil } -func ExecuteHttpRequest(url string) (interface{}, error) { +func executeHttpRequest(url string) (interface{}, error) { response, requestErr := http.Get(url) if requestErr != nil { return nil, requestErr @@ -66,15 +112,17 @@ func ExecuteHttpRequest(url string) (interface{}, error) { return nil, fmt.Errorf("invalid status code %v", response.StatusCode) } + defer func() { response.Body.Close() }() + data, readErr := ioutil.ReadAll(response.Body) if readErr != nil { return nil, readErr } - return JsonBytesToInterface(data) + return jsonBytesToInterface(data) } -func CleanupCommand(cmd *exec.Cmd) error { +func cleanupCommand(cmd *exec.Cmd) error { if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil { return err } @@ -86,8 +134,8 @@ func CleanupCommand(cmd *exec.Cmd) error { return nil } -func GetEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){ - harInterface, convertErr := JsonBytesToInterface(harBytes) +func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){ + harInterface, convertErr := jsonBytesToInterface(harBytes) if convertErr != nil { return nil, convertErr } @@ -97,14 +145,12 @@ func GetEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){ return nil, errors.New("invalid har type") } - harLogInterface := har["log"] - harLog, ok := harLogInterface.(map[string]interface{}) + harLog, ok := har["log"].(map[string]interface{}) if !ok { return nil, errors.New("invalid har log type") } - harEntriesInterface := harLog["entries"] - harEntries, ok := harEntriesInterface.([]interface{}) + harEntries, ok := harLog["entries"].([]interface{}) if !ok { return nil, errors.New("invalid har entries type") } diff --git a/agent/pkg/api/socket_server_handlers.go b/agent/pkg/api/socket_server_handlers.go index f6b4627c8..0e6a9be57 100644 --- a/agent/pkg/api/socket_server_handlers.go +++ b/agent/pkg/api/socket_server_handlers.go @@ -28,6 +28,7 @@ func init() { func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { if isTapper { rlog.Infof("Websocket event - Tapper connected, socket ID: %d", socketId) + providers.TapperAdded() } else { rlog.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId) socketListLock.Lock() @@ -39,6 +40,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) { if isTapper { rlog.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId) + providers.TapperRemoved() } else { rlog.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId) socketListLock.Lock() diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 5e046ee82..e7f5bbc32 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -30,3 +30,7 @@ func PostTappedPods(c *gin.Context) { api.BroadcastToBrowserClients(jsonBytes) } } + +func GetTappersCount(c *gin.Context) { + c.JSON(http.StatusOK, providers.TappersCount) +} diff --git a/agent/pkg/providers/status_provider.go b/agent/pkg/providers/status_provider.go index 4a6b14468..b8971ee4b 100644 --- a/agent/pkg/providers/status_provider.go +++ b/agent/pkg/providers/status_provider.go @@ -4,14 +4,18 @@ import ( "github.com/patrickmn/go-cache" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap" + "sync" "time" ) const tlsLinkRetainmentTime = time.Minute * 15 var ( - TapStatus shared.TapStatus + TappersCount int + TapStatus shared.TapStatus RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) + + tappersCountLock = sync.Mutex{} ) func GetAllRecentTLSAddresses() []string { @@ -26,3 +30,15 @@ func GetAllRecentTLSAddresses() []string { return recentTLSLinks } + +func TapperAdded() { + tappersCountLock.Lock() + TappersCount++ + tappersCountLock.Unlock() +} + +func TapperRemoved() { + tappersCountLock.Lock() + TappersCount-- + tappersCountLock.Unlock() +} diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index a6aa41151..4271332b4 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -9,4 +9,6 @@ func StatusRoutes(ginApp *gin.Engine) { routeGroup := ginApp.Group("/status") routeGroup.POST("/tappedPods", controllers.PostTappedPods) + + routeGroup.GET("/tappersCount", controllers.GetTappersCount) }