Refactor and simplify pcapdump logic (#1701)

* Fix spammy logs

* Fix err related to value missing from pcap config

* Test target dir only when provided

* Improve consistency of error handling

* Remove obsolete code

---------

Co-authored-by: bogdan.balan1 <bogdanvalentin.balan@1nce.com>
This commit is contained in:
bogdanvbalan
2025-01-27 23:42:59 +02:00
committed by GitHub
parent f2e60cdee1
commit 8f6ef686de
3 changed files with 218 additions and 178 deletions

View File

@@ -2,11 +2,14 @@ package cmd
import (
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/creasty/defaults"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
@@ -31,17 +34,23 @@ var pcapDumpCmd = &cobra.Command{
}
}
debugEnabled, _ := cmd.Flags().GetBool("debug")
if debugEnabled {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
log.Debug().Msg("Debug logging enabled")
} else {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
// Use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Error().Err(err).Msg("Error building kubeconfig")
return err
return fmt.Errorf("Error building kubeconfig: %w", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error().Err(err).Msg("Error creating Kubernetes client")
return err
return fmt.Errorf("Error creating Kubernetes client: %w", err)
}
// Parse the `--time` flag
@@ -50,19 +59,35 @@ var pcapDumpCmd = &cobra.Command{
if timeIntervalStr != "" {
duration, err := time.ParseDuration(timeIntervalStr)
if err != nil {
log.Error().Err(err).Msg("Invalid time interval")
return err
return fmt.Errorf("Invalid format %w", err)
}
tempCutoffTime := time.Now().Add(-duration)
cutoffTime = &tempCutoffTime
}
// Handle copy operation if the copy string is provided
// Test the dest dir if provided
destDir, _ := cmd.Flags().GetString(configStructs.PcapDest)
if destDir != "" {
info, err := os.Stat(destDir)
if os.IsNotExist(err) {
return fmt.Errorf("Directory does not exist: %s", destDir)
}
if err != nil {
return fmt.Errorf("Error checking dest directory: %w", err)
}
if !info.IsDir() {
return fmt.Errorf("Dest path is not a directory: %s", destDir)
}
tempFile, err := os.CreateTemp(destDir, "write-test-*")
if err != nil {
return fmt.Errorf("Directory %s is not writable", destDir)
}
_ = os.Remove(tempFile.Name())
}
log.Info().Msg("Copying PCAP files")
err = copyPcapFiles(clientset, config, destDir, cutoffTime)
if err != nil {
log.Error().Err(err).Msg("Error copying PCAP files")
return err
}
@@ -81,4 +106,5 @@ func init() {
pcapDumpCmd.Flags().String(configStructs.PcapTime, "", "Time interval (e.g., 10m, 1h) in the past for which the pcaps are copied")
pcapDumpCmd.Flags().String(configStructs.PcapDest, "", "Local destination path for copied PCAP files (can not be used together with --enabled)")
pcapDumpCmd.Flags().String(configStructs.PcapKubeconfig, "", "Path for kubeconfig (if not provided the default location will be checked)")
pcapDumpCmd.Flags().Bool("debug", false, "Enable debug logging")
}

View File

@@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/kubeshark/gopacket/pcapgo"
@@ -23,20 +24,24 @@ import (
)
const (
label = "app.kubeshark.co/app=worker"
srcDir = "pcapdump"
label = "app.kubeshark.co/app=worker"
srcDir = "pcapdump"
maxSnaplen uint32 = 262144
maxTimePerFile = time.Minute * 5
)
// NamespaceFiles represents the namespace and the files found in that namespace.
type NamespaceFiles struct {
Namespace string // The namespace in which the files were found
SrcDir string // The source directory from which the files were listed
Files []string // List of files found in the namespace
// PodFileInfo represents information about a pod, its namespace, and associated files
type PodFileInfo struct {
Pod corev1.Pod
SrcDir string
Files []string
CopiedFiles []string
}
// listWorkerPods fetches all worker pods from multiple namespaces
func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespaces []string) ([]corev1.Pod, error) {
var allPods []corev1.Pod
func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespaces []string) ([]*PodFileInfo, error) {
var podFileInfos []*PodFileInfo
var errs []error
labelSelector := label
for _, namespace := range namespaces {
@@ -45,128 +50,30 @@ func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespa
LabelSelector: labelSelector,
})
if err != nil {
return nil, fmt.Errorf("failed to list worker pods in namespace %s: %w", namespace, err)
}
// Accumulate the pods
allPods = append(allPods, pods.Items...)
}
return allPods, nil
}
// listFilesInPodDir lists all files in the specified directory inside the pod across multiple namespaces
func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName string, namespaces []string, cutoffTime *time.Time) ([]NamespaceFiles, error) {
var namespaceFilesList []NamespaceFiles
for _, namespace := range namespaces {
// Attempt to get the pod in the current namespace
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to list worker pods in namespace %s: %w", namespace, err))
continue
}
nodeName := pod.Spec.NodeName
srcFilePath := filepath.Join("data", nodeName, srcDir)
cmd := []string{"ls", srcFilePath}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
log.Error().Err(err).Msgf("failed to initialize executor for pod %s in namespace %s", podName, namespace)
continue
}
var stdoutBuf bytes.Buffer
var stderrBuf bytes.Buffer
// Execute the command to list files
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
log.Error().Err(err).Msgf("error listing files in pod %s in namespace %s: %s", podName, namespace, stderrBuf.String())
continue
}
// Split the output (file names) into a list
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
if len(files) == 0 {
log.Info().Msgf("No files found in directory %s in pod %s", srcFilePath, podName)
continue
}
var filteredFiles []string
// Filter files based on cutoff time if provided
for _, file := range files {
if cutoffTime != nil {
parts := strings.Split(file, "-")
if len(parts) < 2 {
log.Warn().Msgf("Skipping file with invalid format: %s", file)
continue
}
timestampStr := parts[len(parts)-2] + parts[len(parts)-1][:6] // Extract YYYYMMDDHHMMSS
fileTime, err := time.Parse("20060102150405", timestampStr)
if err != nil {
log.Warn().Err(err).Msgf("Skipping file with unparsable timestamp: %s", file)
continue
}
if fileTime.Before(*cutoffTime) {
continue
}
}
// Add file to filtered list
filteredFiles = append(filteredFiles, file)
}
if len(filteredFiles) > 0 {
namespaceFilesList = append(namespaceFilesList, NamespaceFiles{
Namespace: namespace,
SrcDir: srcDir,
Files: filteredFiles,
for _, pod := range pods.Items {
podFileInfos = append(podFileInfos, &PodFileInfo{
Pod: pod,
})
}
}
if len(namespaceFilesList) == 0 {
return nil, fmt.Errorf("no files found in pod %s across the provided namespaces", podName)
}
return namespaceFilesList, nil
return podFileInfos, errors.Join(errs...)
}
// copyFileFromPod copies a single file from a pod to a local destination
func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcDir, srcFile, destFile string) error {
// Get the pod to retrieve its node name
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod %s in namespace %s: %w", podName, namespace, err)
}
// listFilesInPodDir lists all files in the specified directory inside the pod across multiple namespaces
func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, pod *PodFileInfo, cutoffTime *time.Time) error {
nodeName := pod.Pod.Spec.NodeName
srcFilePath := filepath.Join("data", nodeName, srcDir)
// Construct the complete path using /data, the node name, srcDir, and srcFile
nodeName := pod.Spec.NodeName
srcFilePath := filepath.Join("data", nodeName, srcDir, srcFile)
// Execute the `cat` command to read the file at the srcFilePath
cmd := []string{"cat", srcFilePath}
cmd := []string{"ls", srcFilePath}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
Name(pod.Pod.Name).
Namespace(pod.Pod.Namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
@@ -176,7 +83,81 @@ func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, confi
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor for pod %s in namespace %s: %w", podName, namespace, err)
return err
}
var stdoutBuf bytes.Buffer
var stderrBuf bytes.Buffer
// Execute the command to list files
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return err
}
// Split the output (file names) into a list
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
if len(files) == 0 {
// No files were found in the target dir for this pod
return nil
}
var filteredFiles []string
var fileProcessingErrs []error
// Filter files based on cutoff time if provided
for _, file := range files {
if cutoffTime != nil {
parts := strings.Split(file, "-")
if len(parts) < 2 {
continue
}
timestampStr := parts[len(parts)-2] + parts[len(parts)-1][:6] // Extract YYYYMMDDHHMMSS
fileTime, err := time.Parse("20060102150405", timestampStr)
if err != nil {
fileProcessingErrs = append(fileProcessingErrs, fmt.Errorf("failed parse file timestamp %s: %w", file, err))
continue
}
if fileTime.Before(*cutoffTime) {
continue
}
}
// Add file to filtered list
filteredFiles = append(filteredFiles, file)
}
pod.SrcDir = srcDir
pod.Files = filteredFiles
return errors.Join(fileProcessingErrs...)
}
// copyFileFromPod copies a single file from a pod to a local destination
func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *PodFileInfo, srcFile, destFile string) error {
// Construct the complete path using /data, the node name, srcDir, and srcFile
nodeName := pod.Pod.Spec.NodeName
srcFilePath := filepath.Join("data", nodeName, srcDir, srcFile)
// Execute the `cat` command to read the file at the srcFilePath
cmd := []string{"cat", srcFilePath}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod.Pod.Name).
Namespace(pod.Pod.Namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor for pod %s in namespace %s: %w", pod.Pod.Name, pod.Pod.Namespace, err)
}
// Create the local file to write the content to
@@ -195,7 +176,7 @@ func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, confi
Stderr: &stderrBuf,
})
if err != nil {
return fmt.Errorf("error copying file from pod %s in namespace %s: %s", podName, namespace, stderrBuf.String())
return err
}
return nil
@@ -209,29 +190,45 @@ func mergePCAPs(outputFile string, inputFiles []string) error {
}
defer f.Close()
bufWriter := bufio.NewWriter(f)
bufWriter := bufio.NewWriterSize(f, 4*1024*1024)
defer bufWriter.Flush()
// Create the PCAP writer
writer := pcapgo.NewWriter(bufWriter)
err = writer.WriteFileHeader(65536, 1)
err = writer.WriteFileHeader(maxSnaplen, 1)
if err != nil {
return fmt.Errorf("failed to write PCAP file header: %w", err)
}
var mergingErrs []error
for _, inputFile := range inputFiles {
// Open the input file
file, err := os.Open(inputFile)
if err != nil {
log.Error().Err(err).Msgf("Failed to open %v", inputFile)
mergingErrs = append(mergingErrs, fmt.Errorf("failed to open %s: %w", inputFile, err))
continue
}
fileInfo, err := file.Stat()
if err != nil {
mergingErrs = append(mergingErrs, fmt.Errorf("failed to stat file %s: %w", inputFile, err))
file.Close()
continue
}
if fileInfo.Size() == 0 {
// Skip empty files
log.Debug().Msgf("Skipped empty file: %s", inputFile)
file.Close()
continue
}
defer file.Close()
// Create the PCAP reader for the input file
reader, err := pcapgo.NewReader(file)
if err != nil {
log.Error().Err(err).Msgf("Failed to create pcapng reader for %v", file.Name())
mergingErrs = append(mergingErrs, fmt.Errorf("failed to create pcapng reader for %v: %w", file.Name(), err))
file.Close()
continue
}
@@ -242,7 +239,7 @@ func mergePCAPs(outputFile string, inputFiles []string) error {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
break
}
log.Error().Err(err).Msgf("Error reading packet from file %s", inputFile)
mergingErrs = append(mergingErrs, fmt.Errorf("error reading packet from file %s: %w", file.Name(), err))
break
}
@@ -250,19 +247,23 @@ func mergePCAPs(outputFile string, inputFiles []string) error {
err = writer.WritePacket(ci, data)
if err != nil {
log.Error().Err(err).Msgf("Error writing packet to output file")
mergingErrs = append(mergingErrs, fmt.Errorf("error writing packet to output file: %w", err))
break
}
}
file.Close()
}
log.Debug().Err(errors.Join(mergingErrs...))
return nil
}
// copyPcapFiles function for copying the PCAP files from the worker pods
func copyPcapFiles(clientset *kubernetes.Clientset, config *rest.Config, destDir string, cutoffTime *time.Time) error {
// List all namespaces
namespaceList, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Error().Err(err).Msg("Error listing namespaces")
return err
}
@@ -271,76 +272,87 @@ func copyPcapFiles(clientset *kubernetes.Clientset, config *rest.Config, destDir
targetNamespaces = append(targetNamespaces, ns.Name)
}
// List worker pods
// List all worker pods
workerPods, err := listWorkerPods(context.Background(), clientset, targetNamespaces)
if err != nil {
log.Warn().Err(err).Msg("Error listing worker pods")
return err
}
var currentFiles []string
// Iterate over each pod to get the PCAP directory from config and copy files
for _, pod := range workerPods {
// Get the list of NamespaceFiles (files per namespace) and their source directories
namespaceFiles, err := listFilesInPodDir(context.Background(), clientset, config, pod.Name, targetNamespaces, cutoffTime)
if err != nil {
log.Warn().Err(err).Send()
continue
if len(workerPods) == 0 {
return err
}
log.Debug().Err(err).Msg("error while listing worker pods")
}
// Copy each file from the pod to the local destination for each namespace
for _, nsFiles := range namespaceFiles {
for _, file := range nsFiles.Files {
var wg sync.WaitGroup
// Launch a goroutine for each pod
for _, pod := range workerPods {
wg.Add(1)
go func(pod *PodFileInfo) {
defer wg.Done()
// List files for the current pod
err := listFilesInPodDir(context.Background(), clientset, config, pod, cutoffTime)
if err != nil {
log.Debug().Err(err).Msgf("error listing files in pod %s", pod.Pod.Name)
return
}
// Copy files from the pod
for _, file := range pod.Files {
destFile := filepath.Join(destDir, file)
// Pass the correct namespace and related details to the function
err = copyFileFromPod(context.Background(), clientset, config, pod.Name, nsFiles.Namespace, nsFiles.SrcDir, file, destFile)
// Add a timeout context for file copy
ctx, cancel := context.WithTimeout(context.Background(), maxTimePerFile)
err := copyFileFromPod(ctx, clientset, config, pod, file, destFile)
cancel()
if err != nil {
log.Error().Err(err).Msgf("Error copying file from pod %s in namespace %s", pod.Name, nsFiles.Namespace)
} else {
log.Info().Msgf("Copied %s from %s to %s", file, pod.Name, destFile)
log.Debug().Err(err).Msgf("error copying file %s from pod %s in namespace %s", file, pod.Pod.Name, pod.Pod.Namespace)
continue
}
currentFiles = append(currentFiles, destFile)
log.Info().Msgf("Copied file %s from pod %s to %s", file, pod.Pod.Name, destFile)
pod.CopiedFiles = append(pod.CopiedFiles, destFile)
}
}
}(pod)
}
if len(currentFiles) == 0 {
log.Error().Msgf("No files to merge")
// Wait for all goroutines to complete
wg.Wait()
var copiedFiles []string
for _, pod := range workerPods {
copiedFiles = append(copiedFiles, pod.CopiedFiles...)
}
if len(copiedFiles) == 0 {
log.Info().Msg("No pcaps available to copy on the workers")
return nil
// continue
}
// Generate a temporary filename based on the first file
tempMergedFile := currentFiles[0] + "_temp"
// Generate a temporary filename for the merged file
tempMergedFile := copiedFiles[0] + "_temp"
// Merge the PCAPs into the temporary file
err = mergePCAPs(tempMergedFile, currentFiles)
// Merge PCAP files
err = mergePCAPs(tempMergedFile, copiedFiles)
if err != nil {
log.Error().Err(err).Msgf("Error merging files")
return err
// continue
os.Remove(tempMergedFile)
return fmt.Errorf("error merging files: %w", err)
}
// Remove the original files after merging
for _, file := range currentFiles {
err := os.Remove(file)
if err != nil {
log.Error().Err(err).Msgf("Error removing file %s", file)
for _, file := range copiedFiles {
if err := os.Remove(file); err != nil {
log.Debug().Err(err).Msgf("error removing file %s", file)
}
}
// Rename the temp file to the final name (removing "_temp")
// Rename the temp file to the final name
finalMergedFile := strings.TrimSuffix(tempMergedFile, "_temp")
err = os.Rename(tempMergedFile, finalMergedFile)
if err != nil {
log.Error().Err(err).Msgf("Error renaming merged file %s", tempMergedFile)
// continue
return err
}
log.Info().Msgf("Merged file created: %s", finalMergedFile)
return nil
}

View File

@@ -238,6 +238,8 @@ type PcapDumpConfig struct {
PcapMaxTime string `yaml:"maxTime" json:"maxTime" default:"1h"`
PcapMaxSize string `yaml:"maxSize" json:"maxSize" default:"500MB"`
PcapTime string `yaml:"time" json:"time" default:"time"`
PcapDebug bool `yaml:"debug" json:"debug" default:"false"`
PcapDest string `yaml:"dest" json:"dest" default:""`
}
type PortMapping struct {