From 1720f19b451bb5ecc53e9223c52bba945252ef3a Mon Sep 17 00:00:00 2001 From: "bogdan.balan1" Date: Fri, 27 Sep 2024 18:34:05 +0300 Subject: [PATCH] Add cmd to copy pcaps from worker --- cmd/pcapDumpCopy.go | 100 +++++++ cmd/pcapDumpRunner.go | 259 ++++++++++++++++++ cmd/pcapDumpStart.go | 101 +++++++ cmd/pcapDumpStop.go | 83 ++++++ config/configStructs/tapConfig.go | 6 + .../templates/09-worker-daemon-set.yaml | 12 +- helm-chart/values.yaml | 6 + 7 files changed, 565 insertions(+), 2 deletions(-) create mode 100644 cmd/pcapDumpCopy.go create mode 100644 cmd/pcapDumpRunner.go create mode 100644 cmd/pcapDumpStart.go create mode 100644 cmd/pcapDumpStop.go diff --git a/cmd/pcapDumpCopy.go b/cmd/pcapDumpCopy.go new file mode 100644 index 000000000..3ce3194f1 --- /dev/null +++ b/cmd/pcapDumpCopy.go @@ -0,0 +1,100 @@ +package cmd + +import ( + "context" + "path/filepath" + + "github.com/creasty/defaults" + "github.com/kubeshark/kubeshark/config/configStructs" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" +) + +const ( + configPath = "/app/config/pcap_config.txt" + namespace = "default" +) + +// pcapCopyCmd represents the pcapcopy command +var pcapCopyCmd = &cobra.Command{ + Use: "pcapcopy", + Short: "Copy PCAP files from worker pods to the local destination", + RunE: func(cmd *cobra.Command, args []string) error { + // Create Kubernetes client + kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config") + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.Error().Err(err).Msg("Error building kubeconfig") + return err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error().Err(err).Msg("Error creating Kubernetes client") + return err + } + + // List worker pods + workerPods, err := listWorkerPods(context.Background(), clientset, namespace) + if err != nil { + log.Error().Err(err).Msg("Error listing worker pods") + return err + } + + // Destination directory for the files + destDir, _ := cmd.Flags().GetString(configStructs.PcapDumpCopy) + + // Iterate over each pod to get the PCAP directory from config and copy files + for _, pod := range workerPods.Items { + // Read the config file from the pod to get the PCAP_DIR value + configMap, err := readConfigFileFromPod(context.Background(), clientset, config, pod.Name, namespace, configPath) + if err != nil { + log.Error().Err(err).Msgf("Error reading config file from pod %s", pod.Name) + continue + } + + // Use the PCAP_DIR value from the config file + srcDir := configMap["PCAP_DIR"] + if srcDir == "" { + log.Error().Msgf("PCAP_DIR not found in config for pod %s", pod.Name) + continue + } + + // List files in the PCAP directory on the pod + files, err := listFilesInPodDir(context.Background(), clientset, config, pod.Name, namespace, srcDir) + if err != nil { + log.Error().Err(err).Msgf("Error listing files in pod %s", pod.Name) + continue + } + + // Copy each file from the pod to the local destination + for _, file := range files { + srcFile := filepath.Join(srcDir, file) + destFile := filepath.Join(destDir, pod.Name+"_"+file) + + err = copyFileFromPod(context.Background(), clientset, config, pod.Name, namespace, srcFile, destFile) + if err != nil { + log.Error().Err(err).Msgf("Error copying file from pod %s", pod.Name) + } + } + } + + return nil + }, +} + +func init() { + rootCmd.AddCommand(pcapCopyCmd) + + defaultTapConfig := configStructs.TapConfig{} + if err := defaults.Set(&defaultTapConfig); err != nil { + log.Debug().Err(err).Send() + } + + // Use full flag name without shorthand + pcapCopyCmd.Flags().String(configStructs.PcapDumpCopy, defaultTapConfig.Misc.PcapDest, "Local destination path for the copied files (required)") + _ = pcapCopyCmd.MarkFlagRequired("dest") +} diff --git a/cmd/pcapDumpRunner.go b/cmd/pcapDumpRunner.go new file mode 100644 index 000000000..946adcfec --- /dev/null +++ b/cmd/pcapDumpRunner.go @@ -0,0 +1,259 @@ +package cmd + +import ( + "bytes" + "context" + "fmt" + "os" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +const label = "app.kubeshark.co/app=worker" + +// listWorkerPods fetches all the worker pods using the Kubernetes client +func listWorkerPods(ctx context.Context, clientset *kubernetes.Clientset, namespace string) (*corev1.PodList, error) { + labelSelector := label + + // List all pods matching the label + pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list worker pods: %w", err) + } + + return pods, nil +} + +// listFilesInPodDir lists all files in the specified directory inside the pod +func listFilesInPodDir(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcDir string) ([]string, error) { + cmd := []string{"ls", srcDir} + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + Param("container", "sniffer"). + Param("stdout", "true"). + Param("stderr", "true"). + Param("command", cmd[0]). + Param("command", cmd[1]) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return nil, fmt.Errorf("failed to initialize executor: %w", err) + } + + // Buffer to capture stdout (file listing) + var stdoutBuf bytes.Buffer + var stderrBuf bytes.Buffer + + // Stream the result of the ls command + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdoutBuf, + Stderr: &stderrBuf, // Capture stderr for better debugging + }) + if err != nil { + return nil, fmt.Errorf("error listing files in pod: %w. Stderr: %s", err, stderrBuf.String()) + } + + // Split the output (file names) into a list + files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n") + return files, nil +} + +// copyFileFromPod copies a single file from a pod to a local destination +func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcFile, destFile string) error { + cmd := []string{"cat", srcFile} + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + Param("container", "sniffer"). + Param("stdout", "true"). + Param("stderr", "true"). + Param("command", cmd[0]). + Param("command", cmd[1]) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return fmt.Errorf("failed to initialize executor: %w", err) + } + + // Create a local file to write the content to + outFile, err := os.Create(destFile) + if err != nil { + return fmt.Errorf("failed to create destination file: %w", err) + } + defer outFile.Close() + + // Capture stderr for error logging + var stderrBuf bytes.Buffer + + // Stream the file content from the pod to the local file + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: outFile, + Stderr: &stderrBuf, // Capture stderr for better debugging + }) + if err != nil { + return fmt.Errorf("error copying file from pod: %w. Stderr: %s", err, stderrBuf.String()) + } + + fmt.Printf("File from pod %s copied to local destination: %s\n", podName, destFile) + return nil +} + +// updatePodEnvVars updates the configuration file inside the worker pod +func updatePodEnvVars(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace string, stop bool, timeInterval, maxTime, maxSize string) error { + var envVars []string + if stop { + envVars = append(envVars, "PCAP_DUMP_ENABLE=false") + } else { + envVars = append(envVars, "PCAP_DUMP_ENABLE=true") + + if timeInterval != "" { + envVars = append(envVars, fmt.Sprintf("TIME_INTERVAL=%s", timeInterval)) + } + if maxTime != "" { + envVars = append(envVars, fmt.Sprintf("MAX_TIME=%s", maxTime)) + } + if maxSize != "" { + envVars = append(envVars, fmt.Sprintf("MAX_SIZE=%s", maxSize)) + } + } + + // Create a command that sets the environment variables directly in the pod + for _, envVar := range envVars { + cmd := []string{"sh", "-c", fmt.Sprintf("export %s", envVar)} + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + Param("container", "sniffer"). // Assuming container is called 'sniffer' + Param("stdout", "true"). + Param("stderr", "true"). + Param("command", cmd[0]). + Param("command", cmd[1]). + Param("command", cmd[2]) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return fmt.Errorf("failed to initialize executor for pod %s: %w", podName, err) + } + + var stdoutBuf, stderrBuf bytes.Buffer + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdoutBuf, + Stderr: &stderrBuf, + }) + if err != nil { + return fmt.Errorf("failed to update env vars in pod %s: %w. Stderr: %s", podName, err, stderrBuf.String()) + } + } + + fmt.Printf("Updated env vars for pod %s\n", podName) + return nil +} + +// readConfigFileFromPod reads the configuration file from the pod +func readConfigFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, configFilePath string) (map[string]string, error) { + cmd := []string{"cat", configFilePath} + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + Param("container", "sniffer"). + Param("stdout", "true"). + Param("stderr", "true"). + Param("command", cmd[0]). + Param("command", cmd[1]) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return nil, fmt.Errorf("failed to initialize executor for pod %s: %w", podName, err) + } + + var stdoutBuf, stderrBuf bytes.Buffer + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdoutBuf, + Stderr: &stderrBuf, + }) + if err != nil { + return nil, fmt.Errorf("failed to read config file from pod %s: %w. Stderr: %s", podName, err, stderrBuf.String()) + } + + // Parse the config content into a map of key-value pairs + configMap := parseConfigContent(stdoutBuf.String()) + return configMap, nil +} + +// writeConfigFileToPod writes the updated configuration map to the file in the pod +func writeConfigFileToPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, configFilePath string, configMap map[string]string) error { + // Convert the config map back to a string format for writing + configContent := formatConfigMapToString(configMap) + + // Escape any single quotes in the config content to avoid issues in the shell command + escapedConfigContent := strings.ReplaceAll(configContent, "'", "'\\''") + + // Prepare the command to write the configuration to the file + cmd := []string{"sh", "-c", fmt.Sprintf("echo '%s' > %s", escapedConfigContent, configFilePath)} + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(podName). + Namespace(namespace). + SubResource("exec"). + Param("container", "sniffer"). + Param("stdout", "true"). + Param("stderr", "true"). + Param("command", cmd[0]). + Param("command", cmd[1]). + Param("command", cmd[2]) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return fmt.Errorf("failed to initialize executor for pod %s: %w", podName, err) + } + + var stdoutBuf, stderrBuf bytes.Buffer + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdoutBuf, + Stderr: &stderrBuf, + }) + if err != nil { + return fmt.Errorf("failed to write config file to pod %s: %w. Stderr: %s", podName, err, stderrBuf.String()) + } + + return nil +} + +// parseConfigContent parses the content of the config file into a map of key-value pairs +func parseConfigContent(content string) map[string]string { + configMap := make(map[string]string) + lines := strings.Split(content, "\n") + for _, line := range lines { + if strings.TrimSpace(line) == "" || !strings.Contains(line, "=") { + continue + } + parts := strings.SplitN(line, "=", 2) + configMap[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) + } + return configMap +} + +// formatConfigMapToString converts the config map back to string format +func formatConfigMapToString(configMap map[string]string) string { + var sb strings.Builder + for key, value := range configMap { + sb.WriteString(fmt.Sprintf("%s=%s\n", key, value)) + } + return sb.String() +} diff --git a/cmd/pcapDumpStart.go b/cmd/pcapDumpStart.go new file mode 100644 index 000000000..a6e8da73b --- /dev/null +++ b/cmd/pcapDumpStart.go @@ -0,0 +1,101 @@ +package cmd + +import ( + "context" + "fmt" + "path/filepath" + + "github.com/creasty/defaults" + "github.com/kubeshark/kubeshark/config/configStructs" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" +) + +// pcapCmd represents the pcapstart command +var pcapStartCmd = &cobra.Command{ + Use: "pcapstart", + Short: "Start capturing traffic and generate a PCAP file", + RunE: func(cmd *cobra.Command, args []string) error { + // Create Kubernetes client + kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config") + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.Error().Err(err).Msg("Error building kubeconfig") + return err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error().Err(err).Msg("Error creating Kubernetes client") + return err + } + + // List worker pods + workerPods, err := listWorkerPods(context.Background(), clientset, namespace) + if err != nil { + log.Error().Err(err).Msg("Error listing worker pods") + return err + } + + // Read the flags for time interval, max time, and max size + timeInterval, _ := cmd.Flags().GetString("time-interval") + maxTime, _ := cmd.Flags().GetString("max-time") + maxSize, _ := cmd.Flags().GetString("max-size") + + // Iterate over each pod to start the PCAP capture by updating the config file + for _, pod := range workerPods.Items { + err := writeStartConfigToFileInPod(context.Background(), clientset, config, pod.Name, namespace, timeInterval, maxTime, maxSize) + if err != nil { + log.Error().Err(err).Msgf("Error updating config file for pod %s", pod.Name) + continue + } + fmt.Printf("PCAP capture started for pod %s\n", pod.Name) + } + + return nil + }, +} + +func init() { + rootCmd.AddCommand(pcapStartCmd) + + defaultTapConfig := configStructs.TapConfig{} + if err := defaults.Set(&defaultTapConfig); err != nil { + log.Debug().Err(err).Send() + } + + // Use full flag name without shorthand + pcapStartCmd.Flags().String("time-interval", defaultTapConfig.Misc.TimeInterval, "Time interval for PCAP file rotation (e.g., 1m, 2h)") + pcapStartCmd.Flags().String("max-time", defaultTapConfig.Misc.MaxTime, "Maximum time for retaining old PCAP files (e.g., 24h)") + pcapStartCmd.Flags().String("max-size", defaultTapConfig.Misc.MaxSize, "Maximum size of PCAP files before deletion (e.g., 500MB, 10GB)") +} + +// writeStartConfigToFileInPod writes config to start pcap in the worker pods +func writeStartConfigToFileInPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, timeInterval, maxTime, maxSize string) error { + existingConfig, err := readConfigFileFromPod(ctx, clientset, config, podName, namespace, configPath) + if err != nil { + return fmt.Errorf("failed to read config file from pod %s: %w", podName, err) + } + + existingConfig["PCAP_DUMP_ENABLE"] = "true" + if timeInterval != "" { + existingConfig["TIME_INTERVAL"] = timeInterval + } + if maxTime != "" { + existingConfig["MAX_TIME"] = maxTime + } + if maxSize != "" { + existingConfig["MAX_SIZE"] = maxSize + } + + err = writeConfigFileToPod(ctx, clientset, config, podName, namespace, configPath, existingConfig) + if err != nil { + return fmt.Errorf("failed to write config file to pod %s: %w", podName, err) + } + + return nil +} diff --git a/cmd/pcapDumpStop.go b/cmd/pcapDumpStop.go new file mode 100644 index 000000000..a69fd8745 --- /dev/null +++ b/cmd/pcapDumpStop.go @@ -0,0 +1,83 @@ +package cmd + +import ( + "context" + "fmt" + "path/filepath" + + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" +) + +// pcapstopCmd represents the pcapstop command +var pcapStopCmd = &cobra.Command{ + Use: "pcapstop", + Short: "Stop capturing traffic and close the PCAP dump", + RunE: func(cmd *cobra.Command, args []string) error { + // Call the function to stop PCAP capture + return stopPcap(cmd) + }, +} + +func init() { + rootCmd.AddCommand(pcapStopCmd) +} + +func stopPcap(cmd *cobra.Command) error { + fmt.Println("Stopping PCAP capture.") + + // Load Kubernetes configuration + kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config") + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + log.Error().Err(err).Msg("Error building kubeconfig") + return err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error().Err(err).Msg("Error creating Kubernetes client") + return err + } + + // Get the list of worker pods + workerPods, err := listWorkerPods(context.Background(), clientset, namespace) + if err != nil { + log.Error().Err(err).Msg("Error listing worker pods") + return err + } + + // Iterate over the worker pods and set config to stop pcap + for _, pod := range workerPods.Items { + err := writeStopConfigToFileInPod(context.Background(), clientset, config, pod.Name, namespace) + if err != nil { + log.Error().Err(err).Msgf("Error updating config file for pod %s", pod.Name) + continue + } + fmt.Printf("PCAP capture stopped for pod %s\n", pod.Name) + } + + fmt.Println("PCAP capture stopped successfully.") + return nil +} + +// writeStopConfigToFileInPod reads the existing config, updates the PCAP_DUMP_ENABLE value, and writes it back to the file +func writeStopConfigToFileInPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace string) error { + existingConfig, err := readConfigFileFromPod(ctx, clientset, config, podName, namespace, configPath) + if err != nil { + return fmt.Errorf("failed to read config file from pod %s: %w", podName, err) + } + + existingConfig["PCAP_DUMP_ENABLE"] = "false" + + err = writeConfigFileToPod(ctx, clientset, config, podName, namespace, configPath, existingConfig) + if err != nil { + return fmt.Errorf("failed to write config file to pod %s: %w", podName, err) + } + + return nil +} diff --git a/config/configStructs/tapConfig.go b/config/configStructs/tapConfig.go index 034d2b920..4dd91b7c5 100644 --- a/config/configStructs/tapConfig.go +++ b/config/configStructs/tapConfig.go @@ -34,6 +34,7 @@ const ( DebugLabel = "debug" ContainerPort = 80 ContainerPortStr = "80" + PcapDumpCopy = "dest" ) type ResourceLimitsHub struct { @@ -169,6 +170,11 @@ type MiscConfig struct { Profile bool `yaml:"profile" json:"profile" default:"false"` DuplicateTimeframe string `yaml:"duplicateTimeframe" json:"duplicateTimeframe" default:"200ms"` DetectDuplicates bool `yaml:"detectDuplicates" json:"detectDuplicates" default:"false"` + PcapDumpEnable bool `yaml:"pcapDumpEnable" json:"pcapDumpEnable" default:"false"` + TimeInterval string `yaml:"timeInterval" json:"timeInterval" default:"1m"` + MaxTime string `yaml:"maxTime" json:"maxTime" default:"24h"` + MaxSize string `yaml:"maxSize" json:"maxSize" default:"500MB"` + PcapDest string `yaml:"pcapDest" json:"pcapDest" default:"."` } type TapConfig struct { diff --git a/helm-chart/templates/09-worker-daemon-set.yaml b/helm-chart/templates/09-worker-daemon-set.yaml index ac19384e0..af1ccf266 100644 --- a/helm-chart/templates/09-worker-daemon-set.yaml +++ b/helm-chart/templates/09-worker-daemon-set.yaml @@ -71,9 +71,9 @@ spec: - -debug {{- end }} {{- if .Values.tap.docker.overrideTag.worker }} - image: '{{ .Values.tap.docker.registry }}/worker:{{ .Values.tap.docker.overrideTag.worker }}{{ include "kubeshark.dockerTagDebugVersion" . }}' + image: '192.168.49.2:5000/kubeshark/worker:dev' {{ else }} - image: '{{ .Values.tap.docker.registry }}/worker:{{ not (eq .Values.tap.docker.tag "") | ternary .Values.tap.docker.tag (printf "v%s" .Chart.Version) }}{{ include "kubeshark.dockerTagDebugVersion" . }}' + image: '192.168.49.2:5000/kubeshark/worker:dev' {{- end }} imagePullPolicy: {{ .Values.tap.docker.imagePullPolicy }} name: sniffer @@ -98,6 +98,14 @@ spec: value: 'https://api.kubeshark.co' - name: PROFILING_ENABLED value: '{{ .Values.tap.misc.profile }}' + - name: PCAP_DUMP_ENABLE + value: '{{ .Values.tap.worker.env.pcapDumpEnable }}' + - name: TIME_INTERVAL + value: '{{ .Values.tap.worker.env.timeInterval }}' + - name: MAX_TIME + value: '{{ .Values.tap.worker.env.maxTime }}' + - name: MAX_SIZE + value: '{{ .Values.tap.worker.env.maxSize }}' resources: limits: cpu: {{ .Values.tap.resources.sniffer.limits.cpu }} diff --git a/helm-chart/values.yaml b/helm-chart/values.yaml index 9970a5c9a..806324b84 100644 --- a/helm-chart/values.yaml +++ b/helm-chart/values.yaml @@ -140,6 +140,12 @@ tap: profile: false duplicateTimeframe: 200ms detectDuplicates: false + worker: + env: + pcapDumpEnable: true + timeInterval: 1m + maxTime: 2h + maxSize: 10GB logs: file: "" grep: ""