mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-23 23:08:35 +00:00
241 lines
5.7 KiB
Go
241 lines
5.7 KiB
Go
package acceptanceTests
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/go-redis/redis/v8"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"os/exec"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestRedis(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("ignored acceptance test")
|
|
}
|
|
|
|
cliPath, cliPathErr := GetCliPath()
|
|
if cliPathErr != nil {
|
|
t.Errorf("failed to get cli path, err: %v", cliPathErr)
|
|
return
|
|
}
|
|
|
|
tapCmdArgs := GetDefaultTapCommandArgs()
|
|
|
|
tapNamespace := GetDefaultTapNamespace()
|
|
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
|
|
|
|
tapCmd := exec.Command(cliPath, tapCmdArgs...)
|
|
t.Logf("running command: %v", tapCmd.String())
|
|
|
|
t.Cleanup(func() {
|
|
if err := CleanupCommand(tapCmd); err != nil {
|
|
t.Logf("failed to cleanup tap command, err: %v", err)
|
|
}
|
|
})
|
|
|
|
if err := tapCmd.Start(); err != nil {
|
|
t.Errorf("failed to start tap command, err: %v", err)
|
|
return
|
|
}
|
|
|
|
apiServerUrl := GetApiServerUrl(DefaultApiServerPort)
|
|
|
|
if err := WaitTapPodsReady(apiServerUrl); err != nil {
|
|
t.Errorf("failed to start tap pods on time, err: %v", err)
|
|
return
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
kubernetesProvider, err := NewKubernetesProvider()
|
|
if err != nil {
|
|
t.Errorf("failed to create k8s provider, err %v", err)
|
|
return
|
|
}
|
|
|
|
redisExternalIp, err := kubernetesProvider.GetServiceExternalIp(ctx, DefaultNamespaceName, "redis")
|
|
if err != nil {
|
|
t.Errorf("failed to get redis external ip, err: %v", err)
|
|
return
|
|
}
|
|
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: fmt.Sprintf("%v:6379", redisExternalIp),
|
|
})
|
|
|
|
for i := 0; i < DefaultEntriesCount/5; i++ {
|
|
requestErr := rdb.Ping(ctx).Err()
|
|
if requestErr != nil {
|
|
t.Errorf("failed to send redis request, err: %v", requestErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
for i := 0; i < DefaultEntriesCount/5; i++ {
|
|
requestErr := rdb.Set(ctx, "key", "value", -1).Err()
|
|
if requestErr != nil {
|
|
t.Errorf("failed to send redis request, err: %v", requestErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
for i := 0; i < DefaultEntriesCount/5; i++ {
|
|
requestErr := rdb.Exists(ctx, "key").Err()
|
|
if requestErr != nil {
|
|
t.Errorf("failed to send redis request, err: %v", requestErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
for i := 0; i < DefaultEntriesCount/5; i++ {
|
|
requestErr := rdb.Get(ctx, "key").Err()
|
|
if requestErr != nil {
|
|
t.Errorf("failed to send redis request, err: %v", requestErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
for i := 0; i < DefaultEntriesCount/5; i++ {
|
|
requestErr := rdb.Del(ctx, "key").Err()
|
|
if requestErr != nil {
|
|
t.Errorf("failed to send redis request, err: %v", requestErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
RunCypressTests(t, "npx cypress run --spec \"cypress/integration/tests/Redis.js\"")
|
|
}
|
|
|
|
func TestAmqp(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("ignored acceptance test")
|
|
}
|
|
|
|
cliPath, cliPathErr := GetCliPath()
|
|
if cliPathErr != nil {
|
|
t.Errorf("failed to get cli path, err: %v", cliPathErr)
|
|
return
|
|
}
|
|
|
|
tapCmdArgs := GetDefaultTapCommandArgs()
|
|
|
|
tapNamespace := GetDefaultTapNamespace()
|
|
tapCmdArgs = append(tapCmdArgs, tapNamespace...)
|
|
|
|
tapCmd := exec.Command(cliPath, tapCmdArgs...)
|
|
t.Logf("running command: %v", tapCmd.String())
|
|
|
|
t.Cleanup(func() {
|
|
if err := CleanupCommand(tapCmd); err != nil {
|
|
t.Logf("failed to cleanup tap command, err: %v", err)
|
|
}
|
|
})
|
|
|
|
if err := tapCmd.Start(); err != nil {
|
|
t.Errorf("failed to start tap command, err: %v", err)
|
|
return
|
|
}
|
|
|
|
apiServerUrl := GetApiServerUrl(DefaultApiServerPort)
|
|
|
|
if err := WaitTapPodsReady(apiServerUrl); err != nil {
|
|
t.Errorf("failed to start tap pods on time, err: %v", err)
|
|
return
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
kubernetesProvider, err := NewKubernetesProvider()
|
|
if err != nil {
|
|
t.Errorf("failed to create k8s provider, err %v", err)
|
|
return
|
|
}
|
|
|
|
rabbitmqExternalIp, err := kubernetesProvider.GetServiceExternalIp(ctx, DefaultNamespaceName, "rabbitmq")
|
|
if err != nil {
|
|
t.Errorf("failed to get RabbitMQ external ip, err: %v", err)
|
|
return
|
|
}
|
|
|
|
conn, err := amqp.Dial(fmt.Sprintf("amqp://guest:guest@%v:5672/", rabbitmqExternalIp))
|
|
if err != nil {
|
|
t.Errorf("failed to connect to RabbitMQ, err: %v", err)
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Temporary fix for missing amqp entries
|
|
time.Sleep(10 * time.Second)
|
|
|
|
for i := 0; i < DefaultEntriesCount/5; i++ {
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
t.Errorf("failed to open a channel, err: %v", err)
|
|
return
|
|
}
|
|
|
|
exchangeName := "exchange"
|
|
err = ch.ExchangeDeclare(exchangeName, "direct", true, false, false, false, nil)
|
|
if err != nil {
|
|
t.Errorf("failed to declare an exchange, err: %v", err)
|
|
return
|
|
}
|
|
|
|
q, err := ch.QueueDeclare("queue", true, false, false, false, nil)
|
|
if err != nil {
|
|
t.Errorf("failed to declare a queue, err: %v", err)
|
|
return
|
|
}
|
|
|
|
routingKey := "routing_key"
|
|
err = ch.QueueBind(q.Name, routingKey, exchangeName, false, nil)
|
|
if err != nil {
|
|
t.Errorf("failed to bind the queue, err: %v", err)
|
|
return
|
|
}
|
|
|
|
err = ch.Publish(exchangeName, routingKey, false, false,
|
|
amqp.Publishing{
|
|
DeliveryMode: amqp.Persistent,
|
|
ContentType: "text/plain",
|
|
Body: []byte("message"),
|
|
})
|
|
if err != nil {
|
|
t.Errorf("failed to publish a message, err: %v", err)
|
|
return
|
|
}
|
|
|
|
msgChan, err := ch.Consume(q.Name, "Consumer", true, false, false, false, nil)
|
|
if err != nil {
|
|
t.Errorf("failed to create a consumer, err: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-msgChan:
|
|
break
|
|
case <-time.After(3 * time.Second):
|
|
t.Errorf("failed to consume a message on time")
|
|
return
|
|
}
|
|
|
|
err = ch.ExchangeDelete(exchangeName, false, false)
|
|
if err != nil {
|
|
t.Errorf("failed to delete the exchange, err: %v", err)
|
|
return
|
|
}
|
|
|
|
_, err = ch.QueueDelete(q.Name, false, false, false)
|
|
if err != nil {
|
|
t.Errorf("failed to delete the queue, err: %v", err)
|
|
return
|
|
}
|
|
|
|
ch.Close()
|
|
}
|
|
|
|
RunCypressTests(t, "npx cypress run --spec \"cypress/integration/tests/Rabbit.js\"")
|
|
}
|