diff --git a/cmd/pcapDump.go b/cmd/pcapDump.go index 1eeac6968..5653642b6 100644 --- a/cmd/pcapDump.go +++ b/cmd/pcapDump.go @@ -2,11 +2,14 @@ package cmd import ( "errors" + "fmt" + "os" "path/filepath" "time" "github.com/creasty/defaults" "github.com/kubeshark/kubeshark/config/configStructs" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" @@ -31,17 +34,23 @@ var pcapDumpCmd = &cobra.Command{ } } + debugEnabled, _ := cmd.Flags().GetBool("debug") + if debugEnabled { + zerolog.SetGlobalLevel(zerolog.DebugLevel) + log.Debug().Msg("Debug logging enabled") + } else { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } + // Use the current context in kubeconfig config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { - log.Error().Err(err).Msg("Error building kubeconfig") - return err + return fmt.Errorf("Error building kubeconfig: %w", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { - log.Error().Err(err).Msg("Error creating Kubernetes client") - return err + return fmt.Errorf("Error creating Kubernetes client: %w", err) } // Parse the `--time` flag @@ -50,19 +59,35 @@ var pcapDumpCmd = &cobra.Command{ if timeIntervalStr != "" { duration, err := time.ParseDuration(timeIntervalStr) if err != nil { - log.Error().Err(err).Msg("Invalid time interval") - return err + return fmt.Errorf("Invalid format %w", err) } tempCutoffTime := time.Now().Add(-duration) cutoffTime = &tempCutoffTime } - // Handle copy operation if the copy string is provided + // Test the dest dir if provided destDir, _ := cmd.Flags().GetString(configStructs.PcapDest) + if destDir != "" { + info, err := os.Stat(destDir) + if os.IsNotExist(err) { + return fmt.Errorf("Directory does not exist: %s", destDir) + } + if err != nil { + return fmt.Errorf("Error checking dest directory: %w", err) + } + if !info.IsDir() { + return fmt.Errorf("Dest path is not a directory: %s", destDir) + } + tempFile, err := os.CreateTemp(destDir, "write-test-*") + if err != nil { + return fmt.Errorf("Directory %s is not writable", destDir) + } + _ = os.Remove(tempFile.Name()) + } + log.Info().Msg("Copying PCAP files") err = copyPcapFiles(clientset, config, destDir, cutoffTime) if err != nil { - log.Error().Err(err).Msg("Error copying PCAP files") return err } @@ -81,4 +106,5 @@ func init() { pcapDumpCmd.Flags().String(configStructs.PcapTime, "", "Time interval (e.g., 10m, 1h) in the past for which the pcaps are copied") pcapDumpCmd.Flags().String(configStructs.PcapDest, "", "Local destination path for copied PCAP files (can not be used together with --enabled)") pcapDumpCmd.Flags().String(configStructs.PcapKubeconfig, "", "Path for kubeconfig (if not provided the default location will be checked)") + pcapDumpCmd.Flags().Bool("debug", false, "Enable debug logging") } diff --git a/cmd/pcapDumpRunner.go b/cmd/pcapDumpRunner.go index 3b51c72da..52947a5f8 100644 --- a/cmd/pcapDumpRunner.go +++ b/cmd/pcapDumpRunner.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/kubeshark/gopacket/pcapgo" @@ -23,20 +24,24 @@ import ( ) const ( - label = "app.kubeshark.co/app=worker" - srcDir = "pcapdump" + label = "app.kubeshark.co/app=worker" + srcDir = "pcapdump" + maxSnaplen uint32 = 262144 + maxTimePerFile = time.Minute * 5 ) -// NamespaceFiles represents the namespace and the files found in that namespace. -type NamespaceFiles struct { - Namespace string // The namespace in which the files were found - SrcDir string // The source directory from which the files were listed - Files []string // List of files found in the namespace +// PodFileInfo represents information about a pod, its namespace, and associated files +type PodFileInfo struct { + Pod corev1.Pod + SrcDir string + Files []string + CopiedFiles []string } // listWorkerPods fetches all worker pods from multiple namespaces -func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespaces []string) ([]corev1.Pod, error) { - var allPods []corev1.Pod +func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespaces []string) ([]*PodFileInfo, error) { + var podFileInfos []*PodFileInfo + var errs []error labelSelector := label for _, namespace := range namespaces { @@ -45,128 +50,30 @@ func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespa LabelSelector: labelSelector, }) if err != nil { - return nil, fmt.Errorf("failed to list worker pods in namespace %s: %w", namespace, err) - } - - // Accumulate the pods - allPods = append(allPods, pods.Items...) - } - - return allPods, nil -} - -// listFilesInPodDir lists all files in the specified directory inside the pod across multiple namespaces -func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName string, namespaces []string, cutoffTime *time.Time) ([]NamespaceFiles, error) { - var namespaceFilesList []NamespaceFiles - - for _, namespace := range namespaces { - // Attempt to get the pod in the current namespace - pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { + errs = append(errs, fmt.Errorf("failed to list worker pods in namespace %s: %w", namespace, err)) continue } - nodeName := pod.Spec.NodeName - srcFilePath := filepath.Join("data", nodeName, srcDir) - - cmd := []string{"ls", srcFilePath} - req := clientset.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(podName). - Namespace(namespace). - SubResource("exec"). - Param("container", "sniffer"). - Param("stdout", "true"). - Param("stderr", "true"). - Param("command", cmd[0]). - Param("command", cmd[1]) - - exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) - if err != nil { - log.Error().Err(err).Msgf("failed to initialize executor for pod %s in namespace %s", podName, namespace) - continue - } - - var stdoutBuf bytes.Buffer - var stderrBuf bytes.Buffer - - // Execute the command to list files - err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ - Stdout: &stdoutBuf, - Stderr: &stderrBuf, - }) - if err != nil { - log.Error().Err(err).Msgf("error listing files in pod %s in namespace %s: %s", podName, namespace, stderrBuf.String()) - continue - } - - // Split the output (file names) into a list - files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n") - if len(files) == 0 { - log.Info().Msgf("No files found in directory %s in pod %s", srcFilePath, podName) - continue - } - - var filteredFiles []string - - // Filter files based on cutoff time if provided - for _, file := range files { - if cutoffTime != nil { - parts := strings.Split(file, "-") - if len(parts) < 2 { - log.Warn().Msgf("Skipping file with invalid format: %s", file) - continue - } - - timestampStr := parts[len(parts)-2] + parts[len(parts)-1][:6] // Extract YYYYMMDDHHMMSS - fileTime, err := time.Parse("20060102150405", timestampStr) - if err != nil { - log.Warn().Err(err).Msgf("Skipping file with unparsable timestamp: %s", file) - continue - } - - if fileTime.Before(*cutoffTime) { - continue - } - } - // Add file to filtered list - filteredFiles = append(filteredFiles, file) - } - - if len(filteredFiles) > 0 { - namespaceFilesList = append(namespaceFilesList, NamespaceFiles{ - Namespace: namespace, - SrcDir: srcDir, - Files: filteredFiles, + for _, pod := range pods.Items { + podFileInfos = append(podFileInfos, &PodFileInfo{ + Pod: pod, }) } } - if len(namespaceFilesList) == 0 { - return nil, fmt.Errorf("no files found in pod %s across the provided namespaces", podName) - } - - return namespaceFilesList, nil + return podFileInfos, errors.Join(errs...) } -// copyFileFromPod copies a single file from a pod to a local destination -func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcDir, srcFile, destFile string) error { - // Get the pod to retrieve its node name - pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get pod %s in namespace %s: %w", podName, namespace, err) - } +// listFilesInPodDir lists all files in the specified directory inside the pod across multiple namespaces +func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, pod *PodFileInfo, cutoffTime *time.Time) error { + nodeName := pod.Pod.Spec.NodeName + srcFilePath := filepath.Join("data", nodeName, srcDir) - // Construct the complete path using /data, the node name, srcDir, and srcFile - nodeName := pod.Spec.NodeName - srcFilePath := filepath.Join("data", nodeName, srcDir, srcFile) - - // Execute the `cat` command to read the file at the srcFilePath - cmd := []string{"cat", srcFilePath} + cmd := []string{"ls", srcFilePath} req := clientset.CoreV1().RESTClient().Post(). Resource("pods"). - Name(podName). - Namespace(namespace). + Name(pod.Pod.Name). + Namespace(pod.Pod.Namespace). SubResource("exec"). Param("container", "sniffer"). Param("stdout", "true"). @@ -176,7 +83,81 @@ func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, confi exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) if err != nil { - return fmt.Errorf("failed to initialize executor for pod %s in namespace %s: %w", podName, namespace, err) + return err + } + + var stdoutBuf bytes.Buffer + var stderrBuf bytes.Buffer + + // Execute the command to list files + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdoutBuf, + Stderr: &stderrBuf, + }) + if err != nil { + return err + } + + // Split the output (file names) into a list + files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n") + if len(files) == 0 { + // No files were found in the target dir for this pod + return nil + } + + var filteredFiles []string + var fileProcessingErrs []error + // Filter files based on cutoff time if provided + for _, file := range files { + if cutoffTime != nil { + parts := strings.Split(file, "-") + if len(parts) < 2 { + continue + } + + timestampStr := parts[len(parts)-2] + parts[len(parts)-1][:6] // Extract YYYYMMDDHHMMSS + fileTime, err := time.Parse("20060102150405", timestampStr) + if err != nil { + fileProcessingErrs = append(fileProcessingErrs, fmt.Errorf("failed parse file timestamp %s: %w", file, err)) + continue + } + + if fileTime.Before(*cutoffTime) { + continue + } + } + // Add file to filtered list + filteredFiles = append(filteredFiles, file) + } + + pod.SrcDir = srcDir + pod.Files = filteredFiles + + return errors.Join(fileProcessingErrs...) +} + +// copyFileFromPod copies a single file from a pod to a local destination +func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *PodFileInfo, srcFile, destFile string) error { + // Construct the complete path using /data, the node name, srcDir, and srcFile + nodeName := pod.Pod.Spec.NodeName + srcFilePath := filepath.Join("data", nodeName, srcDir, srcFile) + + // Execute the `cat` command to read the file at the srcFilePath + cmd := []string{"cat", srcFilePath} + req := clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(pod.Pod.Name). + Namespace(pod.Pod.Namespace). + SubResource("exec"). + Param("container", "sniffer"). + Param("stdout", "true"). + Param("stderr", "true"). + Param("command", cmd[0]). + Param("command", cmd[1]) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + return fmt.Errorf("failed to initialize executor for pod %s in namespace %s: %w", pod.Pod.Name, pod.Pod.Namespace, err) } // Create the local file to write the content to @@ -195,7 +176,7 @@ func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, confi Stderr: &stderrBuf, }) if err != nil { - return fmt.Errorf("error copying file from pod %s in namespace %s: %s", podName, namespace, stderrBuf.String()) + return err } return nil @@ -209,29 +190,45 @@ func mergePCAPs(outputFile string, inputFiles []string) error { } defer f.Close() - bufWriter := bufio.NewWriter(f) + bufWriter := bufio.NewWriterSize(f, 4*1024*1024) defer bufWriter.Flush() // Create the PCAP writer writer := pcapgo.NewWriter(bufWriter) - err = writer.WriteFileHeader(65536, 1) + err = writer.WriteFileHeader(maxSnaplen, 1) if err != nil { return fmt.Errorf("failed to write PCAP file header: %w", err) } + var mergingErrs []error + for _, inputFile := range inputFiles { // Open the input file file, err := os.Open(inputFile) if err != nil { - log.Error().Err(err).Msgf("Failed to open %v", inputFile) + mergingErrs = append(mergingErrs, fmt.Errorf("failed to open %s: %w", inputFile, err)) + continue + } + + fileInfo, err := file.Stat() + if err != nil { + mergingErrs = append(mergingErrs, fmt.Errorf("failed to stat file %s: %w", inputFile, err)) + file.Close() + continue + } + + if fileInfo.Size() == 0 { + // Skip empty files + log.Debug().Msgf("Skipped empty file: %s", inputFile) + file.Close() continue } - defer file.Close() // Create the PCAP reader for the input file reader, err := pcapgo.NewReader(file) if err != nil { - log.Error().Err(err).Msgf("Failed to create pcapng reader for %v", file.Name()) + mergingErrs = append(mergingErrs, fmt.Errorf("failed to create pcapng reader for %v: %w", file.Name(), err)) + file.Close() continue } @@ -242,7 +239,7 @@ func mergePCAPs(outputFile string, inputFiles []string) error { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { break } - log.Error().Err(err).Msgf("Error reading packet from file %s", inputFile) + mergingErrs = append(mergingErrs, fmt.Errorf("error reading packet from file %s: %w", file.Name(), err)) break } @@ -250,19 +247,23 @@ func mergePCAPs(outputFile string, inputFiles []string) error { err = writer.WritePacket(ci, data) if err != nil { log.Error().Err(err).Msgf("Error writing packet to output file") + mergingErrs = append(mergingErrs, fmt.Errorf("error writing packet to output file: %w", err)) break } } + + file.Close() } + log.Debug().Err(errors.Join(mergingErrs...)) + return nil } -// copyPcapFiles function for copying the PCAP files from the worker pods func copyPcapFiles(clientset *kubernetes.Clientset, config *rest.Config, destDir string, cutoffTime *time.Time) error { + // List all namespaces namespaceList, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) if err != nil { - log.Error().Err(err).Msg("Error listing namespaces") return err } @@ -271,76 +272,87 @@ func copyPcapFiles(clientset *kubernetes.Clientset, config *rest.Config, destDir targetNamespaces = append(targetNamespaces, ns.Name) } - // List worker pods + // List all worker pods workerPods, err := listWorkerPods(context.Background(), clientset, targetNamespaces) if err != nil { - log.Warn().Err(err).Msg("Error listing worker pods") - return err - } - var currentFiles []string - - // Iterate over each pod to get the PCAP directory from config and copy files - for _, pod := range workerPods { - // Get the list of NamespaceFiles (files per namespace) and their source directories - namespaceFiles, err := listFilesInPodDir(context.Background(), clientset, config, pod.Name, targetNamespaces, cutoffTime) - if err != nil { - log.Warn().Err(err).Send() - continue + if len(workerPods) == 0 { + return err } + log.Debug().Err(err).Msg("error while listing worker pods") + } - // Copy each file from the pod to the local destination for each namespace - for _, nsFiles := range namespaceFiles { - for _, file := range nsFiles.Files { + var wg sync.WaitGroup + + // Launch a goroutine for each pod + for _, pod := range workerPods { + wg.Add(1) + + go func(pod *PodFileInfo) { + defer wg.Done() + + // List files for the current pod + err := listFilesInPodDir(context.Background(), clientset, config, pod, cutoffTime) + if err != nil { + log.Debug().Err(err).Msgf("error listing files in pod %s", pod.Pod.Name) + return + } + + // Copy files from the pod + for _, file := range pod.Files { destFile := filepath.Join(destDir, file) - // Pass the correct namespace and related details to the function - err = copyFileFromPod(context.Background(), clientset, config, pod.Name, nsFiles.Namespace, nsFiles.SrcDir, file, destFile) + // Add a timeout context for file copy + ctx, cancel := context.WithTimeout(context.Background(), maxTimePerFile) + err := copyFileFromPod(ctx, clientset, config, pod, file, destFile) + cancel() if err != nil { - log.Error().Err(err).Msgf("Error copying file from pod %s in namespace %s", pod.Name, nsFiles.Namespace) - } else { - log.Info().Msgf("Copied %s from %s to %s", file, pod.Name, destFile) + log.Debug().Err(err).Msgf("error copying file %s from pod %s in namespace %s", file, pod.Pod.Name, pod.Pod.Namespace) + continue } - currentFiles = append(currentFiles, destFile) + log.Info().Msgf("Copied file %s from pod %s to %s", file, pod.Pod.Name, destFile) + pod.CopiedFiles = append(pod.CopiedFiles, destFile) } - } + }(pod) } - if len(currentFiles) == 0 { - log.Error().Msgf("No files to merge") + // Wait for all goroutines to complete + wg.Wait() + + var copiedFiles []string + for _, pod := range workerPods { + copiedFiles = append(copiedFiles, pod.CopiedFiles...) + } + + if len(copiedFiles) == 0 { + log.Info().Msg("No pcaps available to copy on the workers") return nil - // continue } - // Generate a temporary filename based on the first file - tempMergedFile := currentFiles[0] + "_temp" + // Generate a temporary filename for the merged file + tempMergedFile := copiedFiles[0] + "_temp" - // Merge the PCAPs into the temporary file - err = mergePCAPs(tempMergedFile, currentFiles) + // Merge PCAP files + err = mergePCAPs(tempMergedFile, copiedFiles) if err != nil { - log.Error().Err(err).Msgf("Error merging files") - return err - // continue + os.Remove(tempMergedFile) + return fmt.Errorf("error merging files: %w", err) } // Remove the original files after merging - for _, file := range currentFiles { - err := os.Remove(file) - if err != nil { - log.Error().Err(err).Msgf("Error removing file %s", file) + for _, file := range copiedFiles { + if err := os.Remove(file); err != nil { + log.Debug().Err(err).Msgf("error removing file %s", file) } } - // Rename the temp file to the final name (removing "_temp") + // Rename the temp file to the final name finalMergedFile := strings.TrimSuffix(tempMergedFile, "_temp") err = os.Rename(tempMergedFile, finalMergedFile) if err != nil { - log.Error().Err(err).Msgf("Error renaming merged file %s", tempMergedFile) - // continue return err } log.Info().Msgf("Merged file created: %s", finalMergedFile) - return nil } diff --git a/config/configStructs/tapConfig.go b/config/configStructs/tapConfig.go index 3c3c9dcb4..4bac6e18a 100644 --- a/config/configStructs/tapConfig.go +++ b/config/configStructs/tapConfig.go @@ -238,6 +238,8 @@ type PcapDumpConfig struct { PcapMaxTime string `yaml:"maxTime" json:"maxTime" default:"1h"` PcapMaxSize string `yaml:"maxSize" json:"maxSize" default:"500MB"` PcapTime string `yaml:"time" json:"time" default:"time"` + PcapDebug bool `yaml:"debug" json:"debug" default:"false"` + PcapDest string `yaml:"dest" json:"dest" default:""` } type PortMapping struct {