mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-09 06:21:57 +00:00
TRA-3234 Fetch command (#54)
* preparation to fetch command * get har as zip from server * no message * no message
This commit is contained in:
@@ -1,20 +1,28 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type MizuFetchOptions struct {
|
||||
Limit uint16
|
||||
Directory string
|
||||
}
|
||||
|
||||
var mizuFetchOptions = MizuFetchOptions{}
|
||||
|
||||
var fetchCmd = &cobra.Command{
|
||||
Use: "fetch",
|
||||
Short: "Download recorded traffic",
|
||||
Short: "Download recorded traffic to files",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
fmt.Println("Not implemented")
|
||||
RunMizuFetch(&mizuFetchOptions)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(fetchCmd)
|
||||
|
||||
fetchCmd.Flags().Uint16VarP(&mizuFetchOptions.Limit, "limit", "l", 1000, "Provide a custom limit for entries to fetch")
|
||||
fetchCmd.Flags().StringVarP(&mizuFetchOptions.Directory, "directory", "d", ".", "Provide a custom directory for fetched entries")
|
||||
}
|
||||
|
92
cli/cmd/fetchRunner.go
Normal file
92
cli/cmd/fetchRunner.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func RunMizuFetch(fetch *MizuFetchOptions) {
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:8899/api/har?limit=%v", fetch.Limit))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
_ = Unzip(zipReader, fetch.Directory)
|
||||
|
||||
}
|
||||
|
||||
func Unzip(reader *zip.Reader, dest string) error {
|
||||
dest, _ = filepath.Abs(dest)
|
||||
_ = os.MkdirAll(dest, os.ModePerm)
|
||||
|
||||
// Closure to address file descriptors issue with all the deferred .Close() methods
|
||||
extractAndWriteFile := func(f *zip.File) error {
|
||||
rc, err := f.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := rc.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
path := filepath.Join(dest, f.Name)
|
||||
|
||||
// Check for ZipSlip (Directory traversal)
|
||||
if !strings.HasPrefix(path, filepath.Clean(dest) + string(os.PathSeparator)) {
|
||||
return fmt.Errorf("illegal file path: %s", path)
|
||||
}
|
||||
|
||||
if f.FileInfo().IsDir() {
|
||||
_ = os.MkdirAll(path, f.Mode())
|
||||
} else {
|
||||
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = io.Copy(f, rc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, f := range reader.File {
|
||||
err := extractAndWriteFile(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -8,7 +8,7 @@ var rootCmd = &cobra.Command{
|
||||
Use: "mizu",
|
||||
Short: "A web traffic viewer for kubernetes",
|
||||
Long: `A web traffic viewer for kubernetes
|
||||
Further info is available at https://github.com/up9inc/mizu`,
|
||||
Further info is available at https://github.com/up9inc/mizu`,
|
||||
}
|
||||
|
||||
// Execute adds all child commands to the root command and sets flags appropriately.
|
||||
|
@@ -3,14 +3,23 @@ package cmd
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"regexp"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
)
|
||||
|
||||
type MizuTapOptions struct {
|
||||
GuiPort uint16
|
||||
Namespace string
|
||||
KubeConfigPath string
|
||||
MizuImage string
|
||||
MizuPodPort uint16
|
||||
}
|
||||
|
||||
|
||||
var mizuTapOptions = &MizuTapOptions{}
|
||||
|
||||
var tapCmd = &cobra.Command{
|
||||
Use: "tap [POD REGEX]",
|
||||
Short: "Record ingoing traffic of a kubernetes pod",
|
||||
@@ -20,7 +29,7 @@ var tapCmd = &cobra.Command{
|
||||
if len(args) == 0 {
|
||||
return errors.New("POD REGEX argument is required")
|
||||
} else if len(args) > 1 {
|
||||
return errors.New("Unexpected number of arguments")
|
||||
return errors.New("unexpected number of arguments")
|
||||
}
|
||||
|
||||
regex, err := regexp.Compile(args[0])
|
||||
@@ -28,7 +37,7 @@ var tapCmd = &cobra.Command{
|
||||
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
|
||||
}
|
||||
|
||||
mizu.Run(regex)
|
||||
RunMizuTap(regex, mizuTapOptions)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
@@ -36,9 +45,9 @@ var tapCmd = &cobra.Command{
|
||||
func init() {
|
||||
rootCmd.AddCommand(tapCmd)
|
||||
|
||||
tapCmd.Flags().Uint16VarP(&config.Configuration.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
|
||||
tapCmd.Flags().StringVarP(&config.Configuration.Namespace, "namespace", "n", "", "Namespace selector")
|
||||
tapCmd.Flags().StringVarP(&config.Configuration.KubeConfigPath, "kubeconfig", "k", "", "Path to kubeconfig file")
|
||||
tapCmd.Flags().StringVarP(&config.Configuration.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
||||
tapCmd.Flags().Uint16VarP(&config.Configuration.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
||||
}
|
||||
|
189
cli/cmd/tapRunner.go
Normal file
189
cli/cmd/tapRunner.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel() // cancel will be called when this function exits
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, podRegexQuery)
|
||||
if err != nil {
|
||||
cleanUpMizuResources(kubernetesProvider)
|
||||
return
|
||||
}
|
||||
err = createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions)
|
||||
if err != nil {
|
||||
cleanUpMizuResources(kubernetesProvider)
|
||||
return
|
||||
}
|
||||
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) //TODO convert this to job for built in pod ttl or have the running app handle this
|
||||
waitForFinish(ctx, cancel) //block until exit signal or error
|
||||
|
||||
// TODO handle incoming traffic from tapper using a channel
|
||||
|
||||
//cleanup
|
||||
fmt.Printf("\nRemoving mizu resources\n")
|
||||
cleanUpMizuResources(kubernetesProvider)
|
||||
}
|
||||
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
|
||||
mizuServiceAccountExists := createRBACIfNecessary(ctx, kubernetesProvider)
|
||||
_, err := kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating mizu collector pod: %v\n", err)
|
||||
return err
|
||||
}
|
||||
aggregatorService, err := kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating mizu collector service: %v\n", err)
|
||||
return err
|
||||
}
|
||||
err = kubernetesProvider.CreateMizuTapperDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName, tappingOptions.MizuImage, mizu.TapperPodName, fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), nodeToTappedPodIPMap, mizuServiceAccountExists)
|
||||
if err != nil {
|
||||
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
|
||||
if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
|
||||
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
|
||||
}
|
||||
if err := kubernetesProvider.RemoveService(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
|
||||
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
|
||||
}
|
||||
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err);
|
||||
}
|
||||
}
|
||||
|
||||
// will be relevant in the future
|
||||
//func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) {
|
||||
// added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
|
||||
// for {
|
||||
// select {
|
||||
// case newTarget := <- added:
|
||||
// fmt.Printf("+%s\n", newTarget.Name)
|
||||
//
|
||||
// case removedTarget := <- removed:
|
||||
// fmt.Printf("-%s\n", removedTarget.Name)
|
||||
//
|
||||
// case <- modified:
|
||||
// continue
|
||||
//
|
||||
// case <- errorChan:
|
||||
// cancel()
|
||||
//
|
||||
// case <- ctx.Done():
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.AggregatorPodName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
|
||||
isPodReady := false
|
||||
var portForward *kubernetes.PortForward
|
||||
for {
|
||||
select {
|
||||
case <- added:
|
||||
continue
|
||||
case <- removed:
|
||||
fmt.Printf("%s removed\n", mizu.AggregatorPodName)
|
||||
cancel()
|
||||
return
|
||||
case modifiedPod := <- modified:
|
||||
if modifiedPod.Status.Phase == "Running" && !isPodReady {
|
||||
isPodReady = true
|
||||
var err error
|
||||
portForward, err = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel)
|
||||
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
|
||||
if err != nil {
|
||||
fmt.Printf("error forwarding port to pod %s\n", err)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
case <- time.After(25 * time.Second):
|
||||
if !isPodReady {
|
||||
fmt.Printf("error: %s pod was not ready in time", mizu.AggregatorPodName)
|
||||
cancel()
|
||||
}
|
||||
|
||||
case <- errorChan:
|
||||
cancel()
|
||||
|
||||
case <- ctx.Done():
|
||||
if portForward != nil {
|
||||
portForward.Stop()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
|
||||
mizuRBACExists, err := kubernetesProvider.DoesMizuRBACExist(ctx, mizu.ResourcesNamespace)
|
||||
if err != nil {
|
||||
fmt.Printf("warning: could not ensure mizu rbac resources exist %v\n", err)
|
||||
return false
|
||||
}
|
||||
if !mizuRBACExists {
|
||||
var versionString = mizu.Version
|
||||
if mizu.GitCommitHash != "" {
|
||||
versionString += "-" + mizu.GitCommitHash
|
||||
}
|
||||
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, versionString)
|
||||
if err != nil {
|
||||
fmt.Printf("warning: could not create mizu rbac resources %v\n", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, regex *regexp.Regexp) (map[string][]string, error) {
|
||||
matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, regex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeToTappedPodIPMap := make(map[string][]string, 0)
|
||||
for _, pod := range matchingPods {
|
||||
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
|
||||
if existingList == nil {
|
||||
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP}
|
||||
} else {
|
||||
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
|
||||
}
|
||||
}
|
||||
return nodeToTappedPodIPMap, nil
|
||||
}
|
||||
|
||||
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
// block until ctx cancel is called or termination signal is received
|
||||
select {
|
||||
case <- ctx.Done():
|
||||
break
|
||||
case <- sigChan:
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -11,7 +11,7 @@ var versionCmd = &cobra.Command{
|
||||
Use: "version",
|
||||
Short: "Print version info",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
fmt.Printf("%s %s\n", mizu.Version, mizu.GitCommitHash)
|
||||
fmt.Printf("%s (%s) %s\n", mizu.Version, mizu.Branch, mizu.GitCommitHash)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
@@ -2,7 +2,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
Reference in New Issue
Block a user