mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-11 13:23:03 +00:00
424 lines
9.8 KiB
Go
424 lines
9.8 KiB
Go
package acceptanceTests
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/up9inc/mizu/shared"
|
|
)
|
|
|
|
const (
|
|
longRetriesCount = 100
|
|
shortRetriesCount = 10
|
|
defaultApiServerPort = shared.DefaultApiServerPort
|
|
defaultNamespaceName = "mizu-tests"
|
|
defaultServiceName = "httpbin"
|
|
defaultEntriesCount = 50
|
|
waitAfterTapPodsReady = 3 * time.Second
|
|
cleanCommandTimeout = 1 * time.Minute
|
|
)
|
|
|
|
type PodDescriptor struct {
|
|
Name string
|
|
Namespace string
|
|
}
|
|
|
|
func isPodDescriptorInPodArray(pods []map[string]interface{}, podDescriptor PodDescriptor) bool {
|
|
for _, pod := range pods {
|
|
podNamespace := pod["namespace"].(string)
|
|
podName := pod["name"].(string)
|
|
|
|
if podDescriptor.Namespace == podNamespace && strings.Contains(podName, podDescriptor.Name) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func getCliPath() (string, error) {
|
|
dir, filePathErr := os.Getwd()
|
|
if filePathErr != nil {
|
|
return "", filePathErr
|
|
}
|
|
|
|
cliPath := path.Join(dir, "../cli/bin/mizu_ci")
|
|
return cliPath, nil
|
|
}
|
|
|
|
func getMizuFolderPath() (string, error) {
|
|
home, homeDirErr := os.UserHomeDir()
|
|
if homeDirErr != nil {
|
|
return "", homeDirErr
|
|
}
|
|
|
|
return path.Join(home, ".mizu"), nil
|
|
}
|
|
|
|
func getConfigPath() (string, error) {
|
|
mizuFolderPath, mizuPathError := getMizuFolderPath()
|
|
if mizuPathError != nil {
|
|
return "", mizuPathError
|
|
}
|
|
|
|
return path.Join(mizuFolderPath, "config.yaml"), nil
|
|
}
|
|
|
|
func getProxyUrl(namespace string, service string) string {
|
|
return fmt.Sprintf("http://localhost:8080/api/v1/namespaces/%v/services/%v/proxy", namespace, service)
|
|
}
|
|
|
|
func getApiServerUrl(port uint16) string {
|
|
return fmt.Sprintf("http://localhost:%v", port)
|
|
}
|
|
|
|
func getWebSocketUrl(port uint16) string {
|
|
return fmt.Sprintf("ws://localhost:%v/ws", port)
|
|
}
|
|
|
|
func getDefaultCommandArgs() []string {
|
|
setFlag := "--set"
|
|
telemetry := "telemetry=false"
|
|
agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0"
|
|
imagePullPolicy := "image-pull-policy=IfNotPresent"
|
|
headless := "headless=true"
|
|
|
|
return []string{setFlag, telemetry, setFlag, agentImage, setFlag, imagePullPolicy, setFlag, headless}
|
|
}
|
|
|
|
func getDefaultTapCommandArgs() []string {
|
|
tapCommand := "tap"
|
|
defaultCmdArgs := getDefaultCommandArgs()
|
|
|
|
return append([]string{tapCommand}, defaultCmdArgs...)
|
|
}
|
|
|
|
func getDefaultTapCommandArgsWithRegex(regex string) []string {
|
|
tapCommand := "tap"
|
|
defaultCmdArgs := getDefaultCommandArgs()
|
|
|
|
return append([]string{tapCommand, regex}, defaultCmdArgs...)
|
|
}
|
|
|
|
func getDefaultLogsCommandArgs() []string {
|
|
logsCommand := "logs"
|
|
defaultCmdArgs := getDefaultCommandArgs()
|
|
|
|
return append([]string{logsCommand}, defaultCmdArgs...)
|
|
}
|
|
|
|
func getDefaultTapNamespace() []string {
|
|
return []string{"-n", "mizu-tests"}
|
|
}
|
|
|
|
func getDefaultConfigCommandArgs() []string {
|
|
configCommand := "config"
|
|
defaultCmdArgs := getDefaultCommandArgs()
|
|
|
|
return append([]string{configCommand}, defaultCmdArgs...)
|
|
}
|
|
|
|
func getDefaultCleanCommandArgs() []string {
|
|
cleanCommand := "clean"
|
|
defaultCmdArgs := getDefaultCommandArgs()
|
|
|
|
return append([]string{cleanCommand}, defaultCmdArgs...)
|
|
}
|
|
|
|
func getDefaultViewCommandArgs() []string {
|
|
viewCommand := "view"
|
|
defaultCmdArgs := getDefaultCommandArgs()
|
|
|
|
return append([]string{viewCommand}, defaultCmdArgs...)
|
|
}
|
|
|
|
func runCypressTests(t *testing.T, cypressRunCmd string) {
|
|
cypressCmd := exec.Command("bash", "-c", cypressRunCmd)
|
|
t.Logf("running command: %v", cypressCmd.String())
|
|
out, err := cypressCmd.Output()
|
|
if err != nil {
|
|
t.Errorf("%s", out)
|
|
return
|
|
}
|
|
t.Logf("%s", out)
|
|
}
|
|
|
|
func retriesExecute(retriesCount int, executeFunc func() error) error {
|
|
var lastError interface{}
|
|
|
|
for i := 0; i < retriesCount; i++ {
|
|
if err := tryExecuteFunc(executeFunc); err != nil {
|
|
lastError = err
|
|
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
|
|
}
|
|
|
|
func tryExecuteFunc(executeFunc func() error) (err interface{}) {
|
|
defer func() {
|
|
if panicErr := recover(); panicErr != nil {
|
|
err = panicErr
|
|
}
|
|
}()
|
|
|
|
return executeFunc()
|
|
}
|
|
|
|
func waitTapPodsReady(apiServerUrl string) error {
|
|
resolvingUrl := fmt.Sprintf("%v/status/connectedTappersCount", apiServerUrl)
|
|
tapPodsReadyFunc := func() error {
|
|
requestResult, requestErr := executeHttpGetRequest(resolvingUrl)
|
|
if requestErr != nil {
|
|
return requestErr
|
|
}
|
|
|
|
connectedTappersCount := requestResult.(float64)
|
|
if connectedTappersCount == 0 {
|
|
return fmt.Errorf("no connected tappers running")
|
|
}
|
|
time.Sleep(waitAfterTapPodsReady)
|
|
return nil
|
|
}
|
|
|
|
return retriesExecute(longRetriesCount, tapPodsReadyFunc)
|
|
}
|
|
|
|
func jsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
|
|
var result interface{}
|
|
if parseErr := json.Unmarshal(jsonBytes, &result); parseErr != nil {
|
|
return nil, parseErr
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func executeHttpRequest(response *http.Response, requestErr error) (interface{}, error) {
|
|
if requestErr != nil {
|
|
return nil, requestErr
|
|
} else if response.StatusCode != 200 {
|
|
return nil, fmt.Errorf("invalid status code %v", response.StatusCode)
|
|
}
|
|
|
|
defer func() { response.Body.Close() }()
|
|
|
|
data, readErr := ioutil.ReadAll(response.Body)
|
|
if readErr != nil {
|
|
return nil, readErr
|
|
}
|
|
|
|
return jsonBytesToInterface(data)
|
|
}
|
|
|
|
func executeHttpGetRequestWithHeaders(url string, headers map[string]string) (interface{}, error) {
|
|
request, err := http.NewRequest(http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for headerKey, headerValue := range headers {
|
|
request.Header.Add(headerKey, headerValue)
|
|
}
|
|
|
|
client := &http.Client{}
|
|
response, requestErr := client.Do(request)
|
|
return executeHttpRequest(response, requestErr)
|
|
}
|
|
|
|
func executeHttpGetRequest(url string) (interface{}, error) {
|
|
response, requestErr := http.Get(url)
|
|
return executeHttpRequest(response, requestErr)
|
|
}
|
|
|
|
func executeHttpPostRequestWithHeaders(url string, headers map[string]string, body interface{}) (interface{}, error) {
|
|
requestBody, jsonErr := json.Marshal(body)
|
|
if jsonErr != nil {
|
|
return nil, jsonErr
|
|
}
|
|
|
|
request, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(requestBody))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
request.Header.Add("Content-Type", "application/json")
|
|
for headerKey, headerValue := range headers {
|
|
request.Header.Add(headerKey, headerValue)
|
|
}
|
|
|
|
client := &http.Client{}
|
|
response, requestErr := client.Do(request)
|
|
return executeHttpRequest(response, requestErr)
|
|
}
|
|
|
|
func runMizuClean() error {
|
|
cliPath, err := getCliPath()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cleanCmdArgs := getDefaultCleanCommandArgs()
|
|
|
|
cleanCmd := exec.Command(cliPath, cleanCmdArgs...)
|
|
|
|
commandDone := make(chan error)
|
|
go func() {
|
|
if err := cleanCmd.Run(); err != nil {
|
|
commandDone <- err
|
|
}
|
|
commandDone <- nil
|
|
}()
|
|
|
|
select {
|
|
case err = <-commandDone:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case <-time.After(cleanCommandTimeout):
|
|
return errors.New("clean command timed out")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func cleanupCommand(cmd *exec.Cmd) error {
|
|
if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := cmd.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
|
|
tapPodsInterface := tapStatusInterface.([]interface{})
|
|
|
|
var pods []map[string]interface{}
|
|
for _, podInterface := range tapPodsInterface {
|
|
pods = append(pods, podInterface.(map[string]interface{}))
|
|
}
|
|
|
|
return pods, nil
|
|
}
|
|
|
|
func getLogsPath() (string, error) {
|
|
dir, filePathErr := os.Getwd()
|
|
if filePathErr != nil {
|
|
return "", filePathErr
|
|
}
|
|
|
|
logsPath := path.Join(dir, "mizu_logs.zip")
|
|
return logsPath, nil
|
|
}
|
|
|
|
// waitTimeout waits for the waitgroup for the specified max timeout.
|
|
// Returns true if waiting timed out.
|
|
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
|
|
channel := make(chan struct{})
|
|
go func() {
|
|
defer close(channel)
|
|
wg.Wait()
|
|
}()
|
|
select {
|
|
case <-channel:
|
|
return false // completed normally
|
|
case <-time.After(timeout):
|
|
return true // timed out
|
|
}
|
|
}
|
|
|
|
// checkEntriesAtLeast checks whether the number of entries greater than or equal to n
|
|
func checkEntriesAtLeast(entries []map[string]interface{}, n int) error {
|
|
if len(entries) < n {
|
|
return fmt.Errorf("Unexpected entries result - Expected more than %d entries", n-1)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getDBEntries retrieves the entries from the database before the given timestamp.
|
|
// Also limits the results according to the limit parameter.
|
|
// Timeout for the WebSocket connection is defined by the timeout parameter.
|
|
func getDBEntries(timestamp int64, limit int, timeout time.Duration) (entries []map[string]interface{}, err error) {
|
|
query := fmt.Sprintf("timestamp < %d and limit(%d)", timestamp, limit)
|
|
webSocketUrl := getWebSocketUrl(defaultApiServerPort)
|
|
|
|
var connection *websocket.Conn
|
|
connection, _, err = websocket.DefaultDialer.Dial(webSocketUrl, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer connection.Close()
|
|
|
|
handleWSConnection := func(wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
for {
|
|
_, message, err := connection.ReadMessage()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var data map[string]interface{}
|
|
if err = json.Unmarshal([]byte(message), &data); err != nil {
|
|
return
|
|
}
|
|
|
|
if data["messageType"] == "entry" {
|
|
entries = append(entries, data)
|
|
}
|
|
}
|
|
}
|
|
|
|
err = connection.WriteMessage(websocket.TextMessage, []byte(query))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
go handleWSConnection(&wg)
|
|
wg.Add(1)
|
|
|
|
waitTimeout(&wg, timeout)
|
|
|
|
return
|
|
}
|
|
|
|
func Contains(slice []string, containsValue string) bool {
|
|
for _, sliceValue := range slice {
|
|
if sliceValue == containsValue {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func ContainsPartOfValue(slice []string, containsValue string) bool {
|
|
for _, sliceValue := range slice {
|
|
if strings.Contains(sliceValue, containsValue) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|