mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-29 09:39:54 +00:00
536 lines
12 KiB
Go
536 lines
12 KiB
Go
package cmd
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bufio"
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/client"
|
|
"github.com/docker/go-connections/nat"
|
|
"github.com/kubeshark/kubeshark/config"
|
|
"github.com/kubeshark/kubeshark/config/configStructs"
|
|
"github.com/kubeshark/kubeshark/internal/connect"
|
|
"github.com/kubeshark/kubeshark/kubernetes"
|
|
"github.com/kubeshark/kubeshark/misc"
|
|
"github.com/kubeshark/kubeshark/utils"
|
|
"github.com/rs/zerolog/log"
|
|
v1 "k8s.io/api/core/v1"
|
|
)
|
|
|
|
func logPullingImage(image string, reader io.ReadCloser) {
|
|
scanner := bufio.NewScanner(reader)
|
|
for scanner.Scan() {
|
|
text := scanner.Text()
|
|
var data map[string]interface{}
|
|
if err := json.Unmarshal([]byte(text), &data); err != nil {
|
|
log.Error().Err(err).Send()
|
|
continue
|
|
}
|
|
|
|
var id string
|
|
if val, ok := data["id"]; ok {
|
|
id = val.(string)
|
|
}
|
|
|
|
var status string
|
|
if val, ok := data["status"]; ok {
|
|
status = val.(string)
|
|
}
|
|
|
|
var progress string
|
|
if val, ok := data["progress"]; ok {
|
|
progress = val.(string)
|
|
}
|
|
|
|
e := log.Info()
|
|
if image != "" {
|
|
e = e.Str("image", image)
|
|
}
|
|
|
|
if progress != "" {
|
|
e = e.Str("progress", progress)
|
|
}
|
|
|
|
e.Msg(fmt.Sprintf("[%-12s] %-18s", id, status))
|
|
}
|
|
}
|
|
|
|
func pullImages(ctx context.Context, cli *client.Client, imageFront string, imageHub string, imageWorker string) error {
|
|
log.Info().Msg("Pulling images...")
|
|
readerFront, err := cli.ImagePull(ctx, imageFront, types.ImagePullOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readerFront.Close()
|
|
logPullingImage(imageFront, readerFront)
|
|
|
|
readerHub, err := cli.ImagePull(ctx, imageHub, types.ImagePullOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readerHub.Close()
|
|
logPullingImage(imageHub, readerHub)
|
|
|
|
readerWorker, err := cli.ImagePull(ctx, imageWorker, types.ImagePullOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readerWorker.Close()
|
|
logPullingImage(imageWorker, readerWorker)
|
|
|
|
return nil
|
|
}
|
|
|
|
func cleanUpOldContainers(
|
|
ctx context.Context,
|
|
cli *client.Client,
|
|
nameFront string,
|
|
nameHub string,
|
|
nameWorker string,
|
|
) error {
|
|
containers, err := cli.ContainerList(ctx, types.ContainerListOptions{All: true})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, container := range containers {
|
|
f := fmt.Sprintf("/%s", nameFront)
|
|
h := fmt.Sprintf("/%s", nameHub)
|
|
w := fmt.Sprintf("/%s", nameWorker)
|
|
if utils.Contains(container.Names, f) || utils.Contains(container.Names, h) || utils.Contains(container.Names, w) {
|
|
err = cli.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createAndStartContainers(
|
|
ctx context.Context,
|
|
cli *client.Client,
|
|
imageFront string,
|
|
imageHub string,
|
|
imageWorker string,
|
|
tarReader io.Reader,
|
|
) (
|
|
respFront container.ContainerCreateCreatedBody,
|
|
respHub container.ContainerCreateCreatedBody,
|
|
respWorker container.ContainerCreateCreatedBody,
|
|
workerIPAddr string,
|
|
err error,
|
|
) {
|
|
log.Info().Msg("Creating containers...")
|
|
|
|
nameFront := fmt.Sprintf("%s-front", misc.Program)
|
|
nameHub := fmt.Sprintf("%s-hub", misc.Program)
|
|
nameWorker := fmt.Sprintf("%s-worker", misc.Program)
|
|
|
|
err = cleanUpOldContainers(ctx, cli, nameFront, nameHub, nameWorker)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
hostIP := "0.0.0.0"
|
|
|
|
hostConfigFront := &container.HostConfig{
|
|
PortBindings: nat.PortMap{
|
|
nat.Port(fmt.Sprintf("%d/tcp", configStructs.ContainerPort)): []nat.PortBinding{
|
|
{
|
|
HostIP: hostIP,
|
|
HostPort: fmt.Sprintf("%d", config.Config.Tap.Proxy.Front.Port),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
respFront, err = cli.ContainerCreate(ctx, &container.Config{
|
|
Image: imageFront,
|
|
Tty: false,
|
|
Env: []string{
|
|
"REACT_APP_DEFAULT_FILTER= ",
|
|
"REACT_APP_HUB_HOST= ",
|
|
fmt.Sprintf("REACT_APP_HUB_PORT=:%d", config.Config.Tap.Proxy.Hub.Port),
|
|
},
|
|
}, hostConfigFront, nil, nil, nameFront)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
hostConfigHub := &container.HostConfig{
|
|
PortBindings: nat.PortMap{
|
|
nat.Port(fmt.Sprintf("%d/tcp", config.Config.Tap.Proxy.Hub.SrvPort)): []nat.PortBinding{
|
|
{
|
|
HostIP: hostIP,
|
|
HostPort: fmt.Sprintf("%d", config.Config.Tap.Proxy.Hub.Port),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
cmdHub := []string{"-port", fmt.Sprintf("%d", config.Config.Tap.Proxy.Hub.SrvPort)}
|
|
if config.DebugMode {
|
|
cmdHub = append(cmdHub, fmt.Sprintf("-%s", config.DebugFlag))
|
|
}
|
|
|
|
respHub, err = cli.ContainerCreate(ctx, &container.Config{
|
|
Image: imageHub,
|
|
Cmd: cmdHub,
|
|
Tty: false,
|
|
ExposedPorts: nat.PortSet{nat.Port(fmt.Sprintf("%d/tcp", config.Config.Tap.Proxy.Hub.SrvPort)): {}},
|
|
}, hostConfigHub, nil, nil, nameHub)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
cmdWorker := []string{"-f", "./import", "-port", fmt.Sprintf("%d", config.Config.Tap.Proxy.Worker.SrvPort)}
|
|
if config.DebugMode {
|
|
cmdWorker = append(cmdWorker, fmt.Sprintf("-%s", config.DebugFlag))
|
|
}
|
|
|
|
respWorker, err = cli.ContainerCreate(ctx, &container.Config{
|
|
Image: imageWorker,
|
|
Cmd: cmdWorker,
|
|
Tty: false,
|
|
}, nil, nil, nil, nameWorker)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if err = cli.CopyToContainer(ctx, respWorker.ID, "/app/import", tarReader, types.CopyToContainerOptions{}); err != nil {
|
|
return
|
|
}
|
|
|
|
log.Info().Msg("Starting containers...")
|
|
|
|
if err = cli.ContainerStart(ctx, respFront.ID, types.ContainerStartOptions{}); err != nil {
|
|
return
|
|
}
|
|
|
|
if err = cli.ContainerStart(ctx, respHub.ID, types.ContainerStartOptions{}); err != nil {
|
|
return
|
|
}
|
|
|
|
if err = cli.ContainerStart(ctx, respWorker.ID, types.ContainerStartOptions{}); err != nil {
|
|
return
|
|
}
|
|
|
|
var containerWorker types.ContainerJSON
|
|
containerWorker, err = cli.ContainerInspect(ctx, respWorker.ID)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
workerIPAddr = containerWorker.NetworkSettings.IPAddress
|
|
|
|
return
|
|
}
|
|
|
|
func stopAndRemoveContainers(
|
|
ctx context.Context,
|
|
cli *client.Client,
|
|
respFront container.ContainerCreateCreatedBody,
|
|
respHub container.ContainerCreateCreatedBody,
|
|
respWorker container.ContainerCreateCreatedBody,
|
|
) (err error) {
|
|
log.Warn().Msg("Stopping containers...")
|
|
err = cli.ContainerStop(ctx, respFront.ID, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = cli.ContainerStop(ctx, respHub.ID, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = cli.ContainerStop(ctx, respWorker.ID, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
log.Warn().Msg("Removing containers...")
|
|
err = cli.ContainerRemove(ctx, respFront.ID, types.ContainerRemoveOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = cli.ContainerRemove(ctx, respHub.ID, types.ContainerRemoveOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = cli.ContainerRemove(ctx, respWorker.ID, types.ContainerRemoveOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func downloadTarFromS3(s3Url string) (tarPath string, err error) {
|
|
u, err := url.Parse(s3Url)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
bucket := u.Host
|
|
key := u.Path[1:]
|
|
|
|
var cfg aws.Config
|
|
cfg, err = awsConfig.LoadDefaultConfig(context.TODO())
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
client := s3.NewFromConfig(cfg)
|
|
|
|
var listObjectsOutput *s3.ListObjectsV2Output
|
|
listObjectsOutput, err = client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(bucket),
|
|
Prefix: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var file *os.File
|
|
file, err = os.CreateTemp(os.TempDir(), fmt.Sprintf("%s_*.%s", strings.TrimSuffix(filepath.Base(key), filepath.Ext(key)), filepath.Ext(key)))
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
log.Info().Str("bucket", bucket).Str("key", key).Msg("Downloading from S3")
|
|
|
|
downloader := manager.NewDownloader(client)
|
|
_, err = downloader.Download(context.TODO(), file, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
log.Info().Err(err).Msg("S3 object is not found. Assuming URL is not a single object. Listing the objects in given folder or the bucket to download...")
|
|
|
|
var tempDirPath string
|
|
tempDirPath, err = os.MkdirTemp(os.TempDir(), "kubeshark_*")
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for _, object := range listObjectsOutput.Contents {
|
|
wg.Add(1)
|
|
go func(object s3Types.Object) {
|
|
defer wg.Done()
|
|
objectKey := *object.Key
|
|
|
|
fullPath := filepath.Join(tempDirPath, objectKey)
|
|
err = os.MkdirAll(filepath.Dir(fullPath), os.ModePerm)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var objectFile *os.File
|
|
objectFile, err = os.Create(fullPath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer objectFile.Close()
|
|
|
|
log.Info().Str("bucket", bucket).Str("key", objectKey).Msg("Downloading from S3")
|
|
|
|
downloader := manager.NewDownloader(client)
|
|
_, err = downloader.Download(context.TODO(), objectFile, &s3.GetObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(objectKey),
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
}(object)
|
|
}
|
|
wg.Wait()
|
|
|
|
tarPath, err = tarDirectory(tempDirPath)
|
|
return
|
|
}
|
|
|
|
tarPath = file.Name()
|
|
|
|
return
|
|
}
|
|
|
|
func tarDirectory(dirPath string) (string, error) {
|
|
tarPath := fmt.Sprintf("%s.tar.gz", dirPath)
|
|
|
|
var file *os.File
|
|
file, err := os.Create(tarPath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer file.Close()
|
|
|
|
gzipWriter := gzip.NewWriter(file)
|
|
defer gzipWriter.Close()
|
|
|
|
tarWriter := tar.NewWriter(gzipWriter)
|
|
defer tarWriter.Close()
|
|
|
|
walker := func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
stat, err := file.Stat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
header := &tar.Header{
|
|
Name: path[len(dirPath)+1:],
|
|
Size: stat.Size(),
|
|
Mode: int64(stat.Mode()),
|
|
ModTime: stat.ModTime(),
|
|
}
|
|
|
|
err = tarWriter.WriteHeader(header)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = io.Copy(tarWriter, file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
err = filepath.Walk(dirPath, walker)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return tarPath, nil
|
|
}
|
|
|
|
func pcap(tarPath string) error {
|
|
if strings.HasPrefix(tarPath, "s3://") {
|
|
var err error
|
|
tarPath, err = downloadTarFromS3(tarPath)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed downloading from S3")
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Info().Str("tar-path", tarPath).Msg("Openning")
|
|
|
|
ctx := context.Background()
|
|
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
|
|
if err != nil {
|
|
log.Error().Err(err).Send()
|
|
return err
|
|
}
|
|
defer cli.Close()
|
|
|
|
imageFront := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "front", config.Config.Tap.Docker.Tag)
|
|
imageHub := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "hub", config.Config.Tap.Docker.Tag)
|
|
imageWorker := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "worker", config.Config.Tap.Docker.Tag)
|
|
|
|
err = pullImages(ctx, cli, imageFront, imageHub, imageWorker)
|
|
if err != nil {
|
|
log.Error().Err(err).Send()
|
|
return err
|
|
}
|
|
|
|
tarFile, err := os.Open(tarPath)
|
|
if err != nil {
|
|
log.Error().Err(err).Send()
|
|
return err
|
|
}
|
|
defer tarFile.Close()
|
|
tarReader := bufio.NewReader(tarFile)
|
|
|
|
respFront, respHub, respWorker, workerIPAddr, err := createAndStartContainers(
|
|
ctx,
|
|
cli,
|
|
imageFront,
|
|
imageHub,
|
|
imageWorker,
|
|
tarReader,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Send()
|
|
return err
|
|
}
|
|
|
|
workerPod := &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
NodeName: "docker",
|
|
},
|
|
Status: v1.PodStatus{
|
|
PodIP: workerIPAddr,
|
|
Phase: v1.PodRunning,
|
|
ContainerStatuses: []v1.ContainerStatus{
|
|
{
|
|
Ready: true,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
connector = connect.NewConnector(kubernetes.GetHubUrl(), connect.DefaultRetries, connect.DefaultTimeout)
|
|
connector.PostWorkerPodToHub(workerPod)
|
|
|
|
// License
|
|
if config.Config.License != "" {
|
|
connector.PostLicense(config.Config.License)
|
|
}
|
|
|
|
log.Info().
|
|
Str("url", kubernetes.GetHubUrl()).
|
|
Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
|
|
|
|
url := kubernetes.GetProxyOnPort(config.Config.Tap.Proxy.Front.Port)
|
|
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
|
|
|
|
if !config.Config.HeadlessMode {
|
|
utils.OpenBrowser(url)
|
|
}
|
|
|
|
ctxC, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
utils.WaitForTermination(ctxC, cancel)
|
|
|
|
err = stopAndRemoveContainers(ctx, cli, respFront, respHub, respWorker)
|
|
if err != nil {
|
|
log.Error().Err(err).Send()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|