mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-10 06:50:54 +00:00
added tapper count route and wait time for tappers in test (#226)
This commit is contained in:
@@ -13,22 +13,22 @@ func TestTapAndFetch(t *testing.T) {
|
|||||||
t.Skip("ignored acceptance test")
|
t.Skip("ignored acceptance test")
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []int{1, 100}
|
tests := []int{50}
|
||||||
|
|
||||||
for _, entriesCount := range tests {
|
for _, entriesCount := range tests {
|
||||||
t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) {
|
||||||
cliPath, cliPathErr := GetCliPath()
|
cliPath, cliPathErr := getCliPath()
|
||||||
if cliPathErr != nil {
|
if cliPathErr != nil {
|
||||||
t.Errorf("failed to get cli path, err: %v", cliPathErr)
|
t.Errorf("failed to get cli path, err: %v", cliPathErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
tapCmdArgs := GetDefaultTapCommandArgs()
|
tapCmdArgs := getDefaultTapCommandArgs()
|
||||||
tapCmd := exec.Command(cliPath, tapCmdArgs...)
|
tapCmd := exec.Command(cliPath, tapCmdArgs...)
|
||||||
t.Logf("running command: %v", tapCmd.String())
|
t.Logf("running command: %v", tapCmd.String())
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
if err := CleanupCommand(tapCmd); err != nil {
|
if err := cleanupCommand(tapCmd); err != nil {
|
||||||
t.Logf("failed to cleanup tap command, err: %v", err)
|
t.Logf("failed to cleanup tap command, err: %v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -38,86 +38,87 @@ func TestTapAndFetch(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(30 * time.Second)
|
if err := waitTapPodsReady(); err != nil {
|
||||||
|
t.Errorf("failed to start tap pods on time, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
proxyUrl := "http://localhost:8080/api/v1/namespaces/mizu-tests/services/httpbin/proxy/get"
|
proxyUrl := "http://localhost:8080/api/v1/namespaces/mizu-tests/services/httpbin/proxy/get"
|
||||||
for i := 0; i < entriesCount; i++ {
|
for i := 0; i < entriesCount; i++ {
|
||||||
if _, requestErr := ExecuteHttpRequest(proxyUrl); requestErr != nil {
|
if _, requestErr := executeHttpRequest(proxyUrl); requestErr != nil {
|
||||||
t.Errorf("failed to send proxy request, err: %v", requestErr)
|
t.Errorf("failed to send proxy request, err: %v", requestErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
entriesCheckFunc := func() error {
|
||||||
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
|
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
|
||||||
|
|
||||||
entriesUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries?limit=%v&operator=lt×tamp=%v", entriesCount, timestamp)
|
entriesUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries?limit=%v&operator=lt×tamp=%v", entriesCount, timestamp)
|
||||||
requestResult, requestErr := ExecuteHttpRequest(entriesUrl)
|
requestResult, requestErr := executeHttpRequest(entriesUrl)
|
||||||
if requestErr != nil {
|
if requestErr != nil {
|
||||||
t.Errorf("failed to get entries, err: %v", requestErr)
|
return fmt.Errorf("failed to get entries, err: %v", requestErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, ok := requestResult.([]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid entries type")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(entries) == 0 {
|
||||||
|
return fmt.Errorf("unexpected entries result - Expected more than 0 entries")
|
||||||
|
}
|
||||||
|
|
||||||
|
entry, ok := entries[0].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid entry type")
|
||||||
|
}
|
||||||
|
|
||||||
|
entryUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries/%v", entry["id"])
|
||||||
|
requestResult, requestErr = executeHttpRequest(entryUrl)
|
||||||
|
if requestErr != nil {
|
||||||
|
return fmt.Errorf("failed to get entry, err: %v", requestErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if requestResult == nil {
|
||||||
|
return fmt.Errorf("unexpected nil entry result")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := retriesExecute(ShortRetriesCount, entriesCheckFunc); err != nil {
|
||||||
|
t.Errorf("%v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
entries, ok := requestResult.([]interface{})
|
fetchCmdArgs := getDefaultFetchCommandArgs()
|
||||||
if !ok {
|
|
||||||
t.Errorf("invalid entries type")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(entries) != entriesCount {
|
|
||||||
t.Errorf("unexpected entries result - Expected: %v, actual: %v", entriesCount, len(entries))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
entry, ok := entries[0].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("invalid entry type")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
entryUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries/%v", entry["id"])
|
|
||||||
requestResult, requestErr = ExecuteHttpRequest(entryUrl)
|
|
||||||
if requestErr != nil {
|
|
||||||
t.Errorf("failed to get entry, err: %v", requestErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if requestResult == nil {
|
|
||||||
t.Errorf("unexpected nil entry result")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fetchCmdArgs := GetDefaultFetchCommandArgs()
|
|
||||||
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
|
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
|
||||||
t.Logf("running command: %v", fetchCmd.String())
|
t.Logf("running command: %v", fetchCmd.String())
|
||||||
|
|
||||||
t.Cleanup(func() {
|
|
||||||
if err := CleanupCommand(fetchCmd); err != nil {
|
|
||||||
t.Logf("failed to cleanup fetch command, err: %v", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := fetchCmd.Start(); err != nil {
|
if err := fetchCmd.Start(); err != nil {
|
||||||
t.Errorf("failed to start fetch command, err: %v", err)
|
t.Errorf("failed to start fetch command, err: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
harCheckFunc := func() error {
|
||||||
|
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
|
||||||
|
if readFileErr != nil {
|
||||||
|
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
|
||||||
|
}
|
||||||
|
|
||||||
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
|
harEntries, err := getEntriesFromHarBytes(harBytes)
|
||||||
if readFileErr != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to read har file, err: %v", readFileErr)
|
return fmt.Errorf("failed to get entries from har, err: %v", err)
|
||||||
return
|
}
|
||||||
|
|
||||||
|
if len(harEntries) == 0 {
|
||||||
|
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
if err := retriesExecute(ShortRetriesCount, harCheckFunc); err != nil {
|
||||||
harEntries, err := GetEntriesFromHarBytes(harBytes)
|
t.Errorf("%v", err)
|
||||||
if err != nil {
|
|
||||||
t.Errorf("failed to get entries from har, err: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(harEntries) != entriesCount {
|
|
||||||
t.Errorf("unexpected har entries result - Expected: %v, actual: %v", entriesCount, len(harEntries))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@@ -10,9 +10,15 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetCliPath() (string, error) {
|
const (
|
||||||
|
LongRetriesCount = 100
|
||||||
|
ShortRetriesCount = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
func getCliPath() (string, error) {
|
||||||
dir, filePathErr := os.Getwd()
|
dir, filePathErr := os.Getwd()
|
||||||
if filePathErr != nil {
|
if filePathErr != nil {
|
||||||
return "", filePathErr
|
return "", filePathErr
|
||||||
@@ -22,34 +28,74 @@ func GetCliPath() (string, error) {
|
|||||||
return cliPath, nil
|
return cliPath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDefaultCommandArgs() []string {
|
func getDefaultCommandArgs() []string {
|
||||||
setFlag := "--set"
|
setFlag := "--set"
|
||||||
telemetry := "telemetry=false"
|
telemetry := "telemetry=false"
|
||||||
|
|
||||||
return []string{setFlag, telemetry}
|
return []string{setFlag, telemetry}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDefaultTapCommandArgs() []string {
|
func getDefaultTapCommandArgs() []string {
|
||||||
tapCommand := "tap"
|
tapCommand := "tap"
|
||||||
setFlag := "--set"
|
setFlag := "--set"
|
||||||
namespaces := "tap.namespaces=mizu-tests"
|
namespaces := "tap.namespaces=mizu-tests"
|
||||||
agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0"
|
agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0"
|
||||||
imagePullPolicy := "image-pull-policy=Never"
|
imagePullPolicy := "image-pull-policy=Never"
|
||||||
|
|
||||||
defaultCmdArgs := GetDefaultCommandArgs()
|
defaultCmdArgs := getDefaultCommandArgs()
|
||||||
|
|
||||||
return append([]string{tapCommand, setFlag, namespaces, setFlag, agentImage, setFlag, imagePullPolicy}, defaultCmdArgs...)
|
return append([]string{tapCommand, setFlag, namespaces, setFlag, agentImage, setFlag, imagePullPolicy}, defaultCmdArgs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDefaultFetchCommandArgs() []string {
|
func getDefaultFetchCommandArgs() []string {
|
||||||
tapCommand := "fetch"
|
tapCommand := "fetch"
|
||||||
|
|
||||||
defaultCmdArgs := GetDefaultCommandArgs()
|
defaultCmdArgs := getDefaultCommandArgs()
|
||||||
|
|
||||||
return append([]string{tapCommand}, defaultCmdArgs...)
|
return append([]string{tapCommand}, defaultCmdArgs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func JsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
|
func retriesExecute(retriesCount int, executeFunc func() error) error {
|
||||||
|
var lastError error
|
||||||
|
|
||||||
|
for i := 0; i < retriesCount; i++ {
|
||||||
|
if err := executeFunc(); err != nil {
|
||||||
|
lastError = err
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitTapPodsReady() error {
|
||||||
|
resolvingUrl := fmt.Sprintf("http://localhost:8899/mizu/status/tappersCount")
|
||||||
|
tapPodsReadyFunc := func() error {
|
||||||
|
requestResult, requestErr := executeHttpRequest(resolvingUrl)
|
||||||
|
if requestErr != nil {
|
||||||
|
return requestErr
|
||||||
|
}
|
||||||
|
|
||||||
|
tappersCount, ok := requestResult.(float64)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid tappers count type")
|
||||||
|
}
|
||||||
|
|
||||||
|
if tappersCount == 0 {
|
||||||
|
return fmt.Errorf("no tappers running")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return retriesExecute(LongRetriesCount, tapPodsReadyFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func jsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
|
||||||
var result interface{}
|
var result interface{}
|
||||||
if parseErr := json.Unmarshal(jsonBytes, &result); parseErr != nil {
|
if parseErr := json.Unmarshal(jsonBytes, &result); parseErr != nil {
|
||||||
return nil, parseErr
|
return nil, parseErr
|
||||||
@@ -58,7 +104,7 @@ func JsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExecuteHttpRequest(url string) (interface{}, error) {
|
func executeHttpRequest(url string) (interface{}, error) {
|
||||||
response, requestErr := http.Get(url)
|
response, requestErr := http.Get(url)
|
||||||
if requestErr != nil {
|
if requestErr != nil {
|
||||||
return nil, requestErr
|
return nil, requestErr
|
||||||
@@ -66,15 +112,17 @@ func ExecuteHttpRequest(url string) (interface{}, error) {
|
|||||||
return nil, fmt.Errorf("invalid status code %v", response.StatusCode)
|
return nil, fmt.Errorf("invalid status code %v", response.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer func() { response.Body.Close() }()
|
||||||
|
|
||||||
data, readErr := ioutil.ReadAll(response.Body)
|
data, readErr := ioutil.ReadAll(response.Body)
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
return nil, readErr
|
return nil, readErr
|
||||||
}
|
}
|
||||||
|
|
||||||
return JsonBytesToInterface(data)
|
return jsonBytesToInterface(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func CleanupCommand(cmd *exec.Cmd) error {
|
func cleanupCommand(cmd *exec.Cmd) error {
|
||||||
if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil {
|
if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -86,8 +134,8 @@ func CleanupCommand(cmd *exec.Cmd) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){
|
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){
|
||||||
harInterface, convertErr := JsonBytesToInterface(harBytes)
|
harInterface, convertErr := jsonBytesToInterface(harBytes)
|
||||||
if convertErr != nil {
|
if convertErr != nil {
|
||||||
return nil, convertErr
|
return nil, convertErr
|
||||||
}
|
}
|
||||||
@@ -97,14 +145,12 @@ func GetEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){
|
|||||||
return nil, errors.New("invalid har type")
|
return nil, errors.New("invalid har type")
|
||||||
}
|
}
|
||||||
|
|
||||||
harLogInterface := har["log"]
|
harLog, ok := har["log"].(map[string]interface{})
|
||||||
harLog, ok := harLogInterface.(map[string]interface{})
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("invalid har log type")
|
return nil, errors.New("invalid har log type")
|
||||||
}
|
}
|
||||||
|
|
||||||
harEntriesInterface := harLog["entries"]
|
harEntries, ok := harLog["entries"].([]interface{})
|
||||||
harEntries, ok := harEntriesInterface.([]interface{})
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("invalid har entries type")
|
return nil, errors.New("invalid har entries type")
|
||||||
}
|
}
|
||||||
|
@@ -28,6 +28,7 @@ func init() {
|
|||||||
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||||
if isTapper {
|
if isTapper {
|
||||||
rlog.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
rlog.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
||||||
|
providers.TapperAdded()
|
||||||
} else {
|
} else {
|
||||||
rlog.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
rlog.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||||
socketListLock.Lock()
|
socketListLock.Lock()
|
||||||
@@ -39,6 +40,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
|||||||
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
||||||
if isTapper {
|
if isTapper {
|
||||||
rlog.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
rlog.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
||||||
|
providers.TapperRemoved()
|
||||||
} else {
|
} else {
|
||||||
rlog.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
rlog.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
||||||
socketListLock.Lock()
|
socketListLock.Lock()
|
||||||
|
@@ -30,3 +30,7 @@ func PostTappedPods(c *gin.Context) {
|
|||||||
api.BroadcastToBrowserClients(jsonBytes)
|
api.BroadcastToBrowserClients(jsonBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetTappersCount(c *gin.Context) {
|
||||||
|
c.JSON(http.StatusOK, providers.TappersCount)
|
||||||
|
}
|
||||||
|
@@ -4,14 +4,18 @@ import (
|
|||||||
"github.com/patrickmn/go-cache"
|
"github.com/patrickmn/go-cache"
|
||||||
"github.com/up9inc/mizu/shared"
|
"github.com/up9inc/mizu/shared"
|
||||||
"github.com/up9inc/mizu/tap"
|
"github.com/up9inc/mizu/tap"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tlsLinkRetainmentTime = time.Minute * 15
|
const tlsLinkRetainmentTime = time.Minute * 15
|
||||||
|
|
||||||
var (
|
var (
|
||||||
TapStatus shared.TapStatus
|
TappersCount int
|
||||||
|
TapStatus shared.TapStatus
|
||||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||||
|
|
||||||
|
tappersCountLock = sync.Mutex{}
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetAllRecentTLSAddresses() []string {
|
func GetAllRecentTLSAddresses() []string {
|
||||||
@@ -26,3 +30,15 @@ func GetAllRecentTLSAddresses() []string {
|
|||||||
|
|
||||||
return recentTLSLinks
|
return recentTLSLinks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TapperAdded() {
|
||||||
|
tappersCountLock.Lock()
|
||||||
|
TappersCount++
|
||||||
|
tappersCountLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TapperRemoved() {
|
||||||
|
tappersCountLock.Lock()
|
||||||
|
TappersCount--
|
||||||
|
tappersCountLock.Unlock()
|
||||||
|
}
|
||||||
|
@@ -9,4 +9,6 @@ func StatusRoutes(ginApp *gin.Engine) {
|
|||||||
routeGroup := ginApp.Group("/status")
|
routeGroup := ginApp.Group("/status")
|
||||||
|
|
||||||
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
|
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
|
||||||
|
|
||||||
|
routeGroup.GET("/tappersCount", controllers.GetTappersCount)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user