kubeshark/cmd/tapPcapRunner.go
2023-10-16 23:41:33 +03:00

536 lines
12 KiB
Go

package cmd
import (
"archive/tar"
"bufio"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/internal/connect"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
)
func logPullingImage(image string, reader io.ReadCloser) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
text := scanner.Text()
var data map[string]interface{}
if err := json.Unmarshal([]byte(text), &data); err != nil {
log.Error().Err(err).Send()
continue
}
var id string
if val, ok := data["id"]; ok {
id = val.(string)
}
var status string
if val, ok := data["status"]; ok {
status = val.(string)
}
var progress string
if val, ok := data["progress"]; ok {
progress = val.(string)
}
e := log.Info()
if image != "" {
e = e.Str("image", image)
}
if progress != "" {
e = e.Str("progress", progress)
}
e.Msg(fmt.Sprintf("[%-12s] %-18s", id, status))
}
}
func pullImages(ctx context.Context, cli *client.Client, imageFront string, imageHub string, imageWorker string) error {
log.Info().Msg("Pulling images...")
readerFront, err := cli.ImagePull(ctx, imageFront, types.ImagePullOptions{})
if err != nil {
return err
}
defer readerFront.Close()
logPullingImage(imageFront, readerFront)
readerHub, err := cli.ImagePull(ctx, imageHub, types.ImagePullOptions{})
if err != nil {
return err
}
defer readerHub.Close()
logPullingImage(imageHub, readerHub)
readerWorker, err := cli.ImagePull(ctx, imageWorker, types.ImagePullOptions{})
if err != nil {
return err
}
defer readerWorker.Close()
logPullingImage(imageWorker, readerWorker)
return nil
}
func cleanUpOldContainers(
ctx context.Context,
cli *client.Client,
nameFront string,
nameHub string,
nameWorker string,
) error {
containers, err := cli.ContainerList(ctx, types.ContainerListOptions{All: true})
if err != nil {
return err
}
for _, container := range containers {
f := fmt.Sprintf("/%s", nameFront)
h := fmt.Sprintf("/%s", nameHub)
w := fmt.Sprintf("/%s", nameWorker)
if utils.Contains(container.Names, f) || utils.Contains(container.Names, h) || utils.Contains(container.Names, w) {
err = cli.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true})
if err != nil {
return err
}
}
}
return nil
}
func createAndStartContainers(
ctx context.Context,
cli *client.Client,
imageFront string,
imageHub string,
imageWorker string,
tarReader io.Reader,
) (
respFront container.ContainerCreateCreatedBody,
respHub container.ContainerCreateCreatedBody,
respWorker container.ContainerCreateCreatedBody,
workerIPAddr string,
err error,
) {
log.Info().Msg("Creating containers...")
nameFront := fmt.Sprintf("%s-front", misc.Program)
nameHub := fmt.Sprintf("%s-hub", misc.Program)
nameWorker := fmt.Sprintf("%s-worker", misc.Program)
err = cleanUpOldContainers(ctx, cli, nameFront, nameHub, nameWorker)
if err != nil {
return
}
hostIP := "0.0.0.0"
hostConfigFront := &container.HostConfig{
PortBindings: nat.PortMap{
nat.Port(fmt.Sprintf("%d/tcp", configStructs.ContainerPort)): []nat.PortBinding{
{
HostIP: hostIP,
HostPort: fmt.Sprintf("%d", config.Config.Tap.Proxy.Front.Port),
},
},
},
}
respFront, err = cli.ContainerCreate(ctx, &container.Config{
Image: imageFront,
Tty: false,
Env: []string{
"REACT_APP_DEFAULT_FILTER= ",
"REACT_APP_HUB_HOST= ",
fmt.Sprintf("REACT_APP_HUB_PORT=:%d", config.Config.Tap.Proxy.Hub.Port),
},
}, hostConfigFront, nil, nil, nameFront)
if err != nil {
return
}
hostConfigHub := &container.HostConfig{
PortBindings: nat.PortMap{
nat.Port(fmt.Sprintf("%d/tcp", config.Config.Tap.Proxy.Hub.SrvPort)): []nat.PortBinding{
{
HostIP: hostIP,
HostPort: fmt.Sprintf("%d", config.Config.Tap.Proxy.Hub.Port),
},
},
},
}
cmdHub := []string{"-port", fmt.Sprintf("%d", config.Config.Tap.Proxy.Hub.SrvPort)}
if config.DebugMode {
cmdHub = append(cmdHub, fmt.Sprintf("-%s", config.DebugFlag))
}
respHub, err = cli.ContainerCreate(ctx, &container.Config{
Image: imageHub,
Cmd: cmdHub,
Tty: false,
ExposedPorts: nat.PortSet{nat.Port(fmt.Sprintf("%d/tcp", config.Config.Tap.Proxy.Hub.SrvPort)): {}},
}, hostConfigHub, nil, nil, nameHub)
if err != nil {
return
}
cmdWorker := []string{"-f", "./import", "-port", fmt.Sprintf("%d", config.Config.Tap.Proxy.Worker.SrvPort)}
if config.DebugMode {
cmdWorker = append(cmdWorker, fmt.Sprintf("-%s", config.DebugFlag))
}
respWorker, err = cli.ContainerCreate(ctx, &container.Config{
Image: imageWorker,
Cmd: cmdWorker,
Tty: false,
}, nil, nil, nil, nameWorker)
if err != nil {
return
}
if err = cli.CopyToContainer(ctx, respWorker.ID, "/app/import", tarReader, types.CopyToContainerOptions{}); err != nil {
return
}
log.Info().Msg("Starting containers...")
if err = cli.ContainerStart(ctx, respFront.ID, types.ContainerStartOptions{}); err != nil {
return
}
if err = cli.ContainerStart(ctx, respHub.ID, types.ContainerStartOptions{}); err != nil {
return
}
if err = cli.ContainerStart(ctx, respWorker.ID, types.ContainerStartOptions{}); err != nil {
return
}
var containerWorker types.ContainerJSON
containerWorker, err = cli.ContainerInspect(ctx, respWorker.ID)
if err != nil {
return
}
workerIPAddr = containerWorker.NetworkSettings.IPAddress
return
}
func stopAndRemoveContainers(
ctx context.Context,
cli *client.Client,
respFront container.ContainerCreateCreatedBody,
respHub container.ContainerCreateCreatedBody,
respWorker container.ContainerCreateCreatedBody,
) (err error) {
log.Warn().Msg("Stopping containers...")
err = cli.ContainerStop(ctx, respFront.ID, nil)
if err != nil {
return
}
err = cli.ContainerStop(ctx, respHub.ID, nil)
if err != nil {
return
}
err = cli.ContainerStop(ctx, respWorker.ID, nil)
if err != nil {
return
}
log.Warn().Msg("Removing containers...")
err = cli.ContainerRemove(ctx, respFront.ID, types.ContainerRemoveOptions{})
if err != nil {
return
}
err = cli.ContainerRemove(ctx, respHub.ID, types.ContainerRemoveOptions{})
if err != nil {
return
}
err = cli.ContainerRemove(ctx, respWorker.ID, types.ContainerRemoveOptions{})
if err != nil {
return
}
return
}
func downloadTarFromS3(s3Url string) (tarPath string, err error) {
u, err := url.Parse(s3Url)
if err != nil {
return
}
bucket := u.Host
key := u.Path[1:]
var cfg aws.Config
cfg, err = awsConfig.LoadDefaultConfig(context.TODO())
if err != nil {
return
}
client := s3.NewFromConfig(cfg)
var listObjectsOutput *s3.ListObjectsV2Output
listObjectsOutput, err = client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(key),
})
if err != nil {
return
}
var file *os.File
file, err = os.CreateTemp(os.TempDir(), fmt.Sprintf("%s_*.%s", strings.TrimSuffix(filepath.Base(key), filepath.Ext(key)), filepath.Ext(key)))
if err != nil {
return
}
defer file.Close()
log.Info().Str("bucket", bucket).Str("key", key).Msg("Downloading from S3")
downloader := manager.NewDownloader(client)
_, err = downloader.Download(context.TODO(), file, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Info().Err(err).Msg("S3 object is not found. Assuming URL is not a single object. Listing the objects in given folder or the bucket to download...")
var tempDirPath string
tempDirPath, err = os.MkdirTemp(os.TempDir(), "kubeshark_*")
if err != nil {
return
}
var wg sync.WaitGroup
for _, object := range listObjectsOutput.Contents {
wg.Add(1)
go func(object s3Types.Object) {
defer wg.Done()
objectKey := *object.Key
fullPath := filepath.Join(tempDirPath, objectKey)
err = os.MkdirAll(filepath.Dir(fullPath), os.ModePerm)
if err != nil {
return
}
var objectFile *os.File
objectFile, err = os.Create(fullPath)
if err != nil {
return
}
defer objectFile.Close()
log.Info().Str("bucket", bucket).Str("key", objectKey).Msg("Downloading from S3")
downloader := manager.NewDownloader(client)
_, err = downloader.Download(context.TODO(), objectFile, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
if err != nil {
return
}
}(object)
}
wg.Wait()
tarPath, err = tarDirectory(tempDirPath)
return
}
tarPath = file.Name()
return
}
func tarDirectory(dirPath string) (string, error) {
tarPath := fmt.Sprintf("%s.tar.gz", dirPath)
var file *os.File
file, err := os.Create(tarPath)
if err != nil {
return "", err
}
defer file.Close()
gzipWriter := gzip.NewWriter(file)
defer gzipWriter.Close()
tarWriter := tar.NewWriter(gzipWriter)
defer tarWriter.Close()
walker := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return err
}
header := &tar.Header{
Name: path[len(dirPath)+1:],
Size: stat.Size(),
Mode: int64(stat.Mode()),
ModTime: stat.ModTime(),
}
err = tarWriter.WriteHeader(header)
if err != nil {
return err
}
_, err = io.Copy(tarWriter, file)
if err != nil {
return err
}
return nil
}
err = filepath.Walk(dirPath, walker)
if err != nil {
return "", err
}
return tarPath, nil
}
func pcap(tarPath string) error {
if strings.HasPrefix(tarPath, "s3://") {
var err error
tarPath, err = downloadTarFromS3(tarPath)
if err != nil {
log.Error().Err(err).Msg("Failed downloading from S3")
return err
}
}
log.Info().Str("tar-path", tarPath).Msg("Openning")
ctx := context.Background()
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
log.Error().Err(err).Send()
return err
}
defer cli.Close()
imageFront := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "front", config.Config.Tap.Docker.Tag)
imageHub := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "hub", config.Config.Tap.Docker.Tag)
imageWorker := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "worker", config.Config.Tap.Docker.Tag)
err = pullImages(ctx, cli, imageFront, imageHub, imageWorker)
if err != nil {
log.Error().Err(err).Send()
return err
}
tarFile, err := os.Open(tarPath)
if err != nil {
log.Error().Err(err).Send()
return err
}
defer tarFile.Close()
tarReader := bufio.NewReader(tarFile)
respFront, respHub, respWorker, workerIPAddr, err := createAndStartContainers(
ctx,
cli,
imageFront,
imageHub,
imageWorker,
tarReader,
)
if err != nil {
log.Error().Err(err).Send()
return err
}
workerPod := &v1.Pod{
Spec: v1.PodSpec{
NodeName: "docker",
},
Status: v1.PodStatus{
PodIP: workerIPAddr,
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
Ready: true,
},
},
},
}
connector = connect.NewConnector(kubernetes.GetHubUrl(), connect.DefaultRetries, connect.DefaultTimeout)
connector.PostWorkerPodToHub(workerPod)
// License
if config.Config.License != "" {
connector.PostLicense(config.Config.License)
}
log.Info().
Str("url", kubernetes.GetHubUrl()).
Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
url := kubernetes.GetProxyOnPort(config.Config.Tap.Proxy.Front.Port)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
if !config.Config.HeadlessMode {
utils.OpenBrowser(url)
}
ctxC, cancel := context.WithCancel(context.Background())
defer cancel()
utils.WaitForTermination(ctxC, cancel)
err = stopAndRemoveContainers(ctx, cli, respFront, respHub, respWorker)
if err != nil {
log.Error().Err(err).Send()
return err
}
return nil
}