mirror of
				https://github.com/kubeshark/kubeshark.git
				synced 2025-10-22 15:58:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			241 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package acceptanceTests
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"github.com/go-redis/redis/v8"
 | |
| 	amqp "github.com/rabbitmq/amqp091-go"
 | |
| 	"os/exec"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| func TestRedis(t *testing.T) {
 | |
| 	if testing.Short() {
 | |
| 		t.Skip("ignored acceptance test")
 | |
| 	}
 | |
| 
 | |
| 	cliPath, cliPathErr := GetCliPath()
 | |
| 	if cliPathErr != nil {
 | |
| 		t.Errorf("failed to get cli path, err: %v", cliPathErr)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	tapCmdArgs := GetDefaultTapCommandArgs()
 | |
| 
 | |
| 	tapNamespace := GetDefaultTapNamespace()
 | |
| 	tapCmdArgs = append(tapCmdArgs, tapNamespace...)
 | |
| 
 | |
| 	tapCmd := exec.Command(cliPath, tapCmdArgs...)
 | |
| 	t.Logf("running command: %v", tapCmd.String())
 | |
| 
 | |
| 	t.Cleanup(func() {
 | |
| 		if err := CleanupCommand(tapCmd); err != nil {
 | |
| 			t.Logf("failed to cleanup tap command, err: %v", err)
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	if err := tapCmd.Start(); err != nil {
 | |
| 		t.Errorf("failed to start tap command, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	apiServerUrl := GetApiServerUrl(DefaultApiServerPort)
 | |
| 
 | |
| 	if err := WaitTapPodsReady(apiServerUrl); err != nil {
 | |
| 		t.Errorf("failed to start tap pods on time, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	kubernetesProvider, err := NewKubernetesProvider()
 | |
| 	if err != nil {
 | |
| 		t.Errorf("failed to create k8s provider, err %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	redisExternalIp, err := kubernetesProvider.GetServiceExternalIp(ctx, DefaultNamespaceName, "redis")
 | |
| 	if err != nil {
 | |
| 		t.Errorf("failed to get redis external ip, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rdb := redis.NewClient(&redis.Options{
 | |
| 		Addr: fmt.Sprintf("%v:6379", redisExternalIp),
 | |
| 	})
 | |
| 
 | |
| 	for i := 0; i < DefaultEntriesCount/5; i++ {
 | |
| 		requestErr := rdb.Ping(ctx).Err()
 | |
| 		if requestErr != nil {
 | |
| 			t.Errorf("failed to send redis request, err: %v", requestErr)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < DefaultEntriesCount/5; i++ {
 | |
| 		requestErr := rdb.Set(ctx, "key", "value", -1).Err()
 | |
| 		if requestErr != nil {
 | |
| 			t.Errorf("failed to send redis request, err: %v", requestErr)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < DefaultEntriesCount/5; i++ {
 | |
| 		requestErr := rdb.Exists(ctx, "key").Err()
 | |
| 		if requestErr != nil {
 | |
| 			t.Errorf("failed to send redis request, err: %v", requestErr)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < DefaultEntriesCount/5; i++ {
 | |
| 		requestErr := rdb.Get(ctx, "key").Err()
 | |
| 		if requestErr != nil {
 | |
| 			t.Errorf("failed to send redis request, err: %v", requestErr)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < DefaultEntriesCount/5; i++ {
 | |
| 		requestErr := rdb.Del(ctx, "key").Err()
 | |
| 		if requestErr != nil {
 | |
| 			t.Errorf("failed to send redis request, err: %v", requestErr)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	RunCypressTests(t, "npx cypress run --spec  \"cypress/integration/tests/Redis.js\"")
 | |
| }
 | |
| 
 | |
| func TestAmqp(t *testing.T) {
 | |
| 	if testing.Short() {
 | |
| 		t.Skip("ignored acceptance test")
 | |
| 	}
 | |
| 
 | |
| 	cliPath, cliPathErr := GetCliPath()
 | |
| 	if cliPathErr != nil {
 | |
| 		t.Errorf("failed to get cli path, err: %v", cliPathErr)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	tapCmdArgs := GetDefaultTapCommandArgs()
 | |
| 
 | |
| 	tapNamespace := GetDefaultTapNamespace()
 | |
| 	tapCmdArgs = append(tapCmdArgs, tapNamespace...)
 | |
| 
 | |
| 	tapCmd := exec.Command(cliPath, tapCmdArgs...)
 | |
| 	t.Logf("running command: %v", tapCmd.String())
 | |
| 
 | |
| 	t.Cleanup(func() {
 | |
| 		if err := CleanupCommand(tapCmd); err != nil {
 | |
| 			t.Logf("failed to cleanup tap command, err: %v", err)
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	if err := tapCmd.Start(); err != nil {
 | |
| 		t.Errorf("failed to start tap command, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	apiServerUrl := GetApiServerUrl(DefaultApiServerPort)
 | |
| 
 | |
| 	if err := WaitTapPodsReady(apiServerUrl); err != nil {
 | |
| 		t.Errorf("failed to start tap pods on time, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	kubernetesProvider, err := NewKubernetesProvider()
 | |
| 	if err != nil {
 | |
| 		t.Errorf("failed to create k8s provider, err %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	rabbitmqExternalIp, err := kubernetesProvider.GetServiceExternalIp(ctx, DefaultNamespaceName, "rabbitmq")
 | |
| 	if err != nil {
 | |
| 		t.Errorf("failed to get RabbitMQ external ip, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	conn, err := amqp.Dial(fmt.Sprintf("amqp://guest:guest@%v:5672/", rabbitmqExternalIp))
 | |
| 	if err != nil {
 | |
| 		t.Errorf("failed to connect to RabbitMQ, err: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	defer conn.Close()
 | |
| 
 | |
| 	// Temporary fix for missing amqp entries
 | |
| 	time.Sleep(10 * time.Second)
 | |
| 
 | |
| 	for i := 0; i < DefaultEntriesCount/5; i++ {
 | |
| 		ch, err := conn.Channel()
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to open a channel, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		exchangeName := "exchange"
 | |
| 		err = ch.ExchangeDeclare(exchangeName, "direct", true, false, false, false, nil)
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to declare an exchange, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		q, err := ch.QueueDeclare("queue", true, false, false, false, nil)
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to declare a queue, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		routingKey := "routing_key"
 | |
| 		err = ch.QueueBind(q.Name, routingKey, exchangeName, false, nil)
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to bind the queue, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		err = ch.Publish(exchangeName, routingKey, false, false,
 | |
| 			amqp.Publishing{
 | |
| 				DeliveryMode: amqp.Persistent,
 | |
| 				ContentType:  "text/plain",
 | |
| 				Body:         []byte("message"),
 | |
| 			})
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to publish a message, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		msgChan, err := ch.Consume(q.Name, "Consumer", true, false, false, false, nil)
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to create a consumer, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-msgChan:
 | |
| 			break
 | |
| 		case <-time.After(3 * time.Second):
 | |
| 			t.Errorf("failed to consume a message on time")
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		err = ch.ExchangeDelete(exchangeName, false, false)
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to delete the exchange, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		_, err = ch.QueueDelete(q.Name, false, false, false)
 | |
| 		if err != nil {
 | |
| 			t.Errorf("failed to delete the queue, err: %v", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		ch.Close()
 | |
| 	}
 | |
| 
 | |
| 	RunCypressTests(t, "npx cypress run --spec  \"cypress/integration/tests/Rabbit.js\"")
 | |
| }
 |