mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-01-30 05:54:21 +00:00
Move the helper methods waitTimeout and checkDBHasEntries from tap_test.go to testsUtils.go
This commit is contained in:
@@ -10,81 +10,10 @@ import (
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// waitTimeout waits for the waitgroup for the specified max timeout.
|
||||
// Returns true if waiting timed out.
|
||||
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
|
||||
channel := make(chan struct{})
|
||||
go func() {
|
||||
defer close(channel)
|
||||
wg.Wait()
|
||||
}()
|
||||
select {
|
||||
case <-channel:
|
||||
return false // completed normally
|
||||
case <-time.After(timeout):
|
||||
return true // timed out
|
||||
}
|
||||
}
|
||||
|
||||
// checkDBHasEntries checks whether there are any entries in the database
|
||||
// before the given timestamp. Returns a slice of non-empty entries if it succeeds.
|
||||
func checkDBHasEntries(t *testing.T, timestamp int64, limit int) (entries []map[string]interface{}) {
|
||||
query := fmt.Sprintf("timestamp < %d and limit(%d)", timestamp, limit)
|
||||
webSocketUrl := getWebSocketUrl(defaultApiServerPort)
|
||||
|
||||
connection, _, err := websocket.DefaultDialer.Dial(webSocketUrl, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
defer connection.Close()
|
||||
|
||||
handleWSConnection := func(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
_, message, err := connection.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var data map[string]interface{}
|
||||
if err = json.Unmarshal([]byte(message), &data); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if data["messageType"] == "entry" {
|
||||
entries = append(entries, data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = connection.WriteMessage(websocket.TextMessage, []byte(query))
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
go handleWSConnection(&wg)
|
||||
wg.Add(1)
|
||||
|
||||
waitTimeout(&wg, 1*time.Second)
|
||||
|
||||
if len(entries) == 0 {
|
||||
t.Error("unexpected entries result - Expected more than 0 entries")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestTap(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("ignored acceptance test")
|
||||
|
||||
@@ -11,10 +11,12 @@ import (
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
)
|
||||
|
||||
@@ -316,6 +318,74 @@ func daemonCleanup(t *testing.T, viewCmd *exec.Cmd) {
|
||||
}
|
||||
}
|
||||
|
||||
// waitTimeout waits for the waitgroup for the specified max timeout.
|
||||
// Returns true if waiting timed out.
|
||||
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
|
||||
channel := make(chan struct{})
|
||||
go func() {
|
||||
defer close(channel)
|
||||
wg.Wait()
|
||||
}()
|
||||
select {
|
||||
case <-channel:
|
||||
return false // completed normally
|
||||
case <-time.After(timeout):
|
||||
return true // timed out
|
||||
}
|
||||
}
|
||||
|
||||
// checkDBHasEntries checks whether there are any entries in the database
|
||||
// before the given timestamp. Returns a slice of non-empty entries if it succeeds.
|
||||
func checkDBHasEntries(t *testing.T, timestamp int64, limit int) (entries []map[string]interface{}) {
|
||||
query := fmt.Sprintf("timestamp < %d and limit(%d)", timestamp, limit)
|
||||
webSocketUrl := getWebSocketUrl(defaultApiServerPort)
|
||||
|
||||
connection, _, err := websocket.DefaultDialer.Dial(webSocketUrl, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
defer connection.Close()
|
||||
|
||||
handleWSConnection := func(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
_, message, err := connection.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var data map[string]interface{}
|
||||
if err = json.Unmarshal([]byte(message), &data); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if data["messageType"] == "entry" {
|
||||
entries = append(entries, data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = connection.WriteMessage(websocket.TextMessage, []byte(query))
|
||||
if err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
go handleWSConnection(&wg)
|
||||
wg.Add(1)
|
||||
|
||||
waitTimeout(&wg, 1*time.Second)
|
||||
|
||||
if len(entries) == 0 {
|
||||
t.Error("unexpected entries result - Expected more than 0 entries")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func Contains(slice []string, containsValue string) bool {
|
||||
for _, sliceValue := range slice {
|
||||
if sliceValue == containsValue {
|
||||
|
||||
Reference in New Issue
Block a user