kubeshark/cmd/pcapDumpRunner.go
bogdanvbalan f5637972f2
Add --time param to pcapdump (#1664)
* Add --time param to pcapdump

* Update description

* Remove obsolete code

* Revert config change

* Add time to pcap config

---------

Co-authored-by: bogdan.balan1 <bogdanvalentin.balan@1nce.com>
Co-authored-by: Alon Girmonsky <1990761+alongir@users.noreply.github.com>
2024-12-16 08:29:40 -08:00

346 lines
10 KiB
Go

package cmd
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/kubeshark/gopacket"
"github.com/kubeshark/gopacket/layers"
"github.com/kubeshark/gopacket/pcapgo"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientk8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
const label = "app.kubeshark.co/app=worker"
const SELF_RESOURCES_PREFIX = "kubeshark-"
const SUFFIX_CONFIG_MAP = "config-map"
// NamespaceFiles represents the namespace and the files found in that namespace.
type NamespaceFiles struct {
Namespace string // The namespace in which the files were found
SrcDir string // The source directory from which the files were listed
Files []string // List of files found in the namespace
}
// listWorkerPods fetches all worker pods from multiple namespaces
func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespaces []string) ([]corev1.Pod, error) {
var allPods []corev1.Pod
labelSelector := label
for _, namespace := range namespaces {
// List all pods matching the label in the current namespace
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, fmt.Errorf("failed to list worker pods in namespace %s: %w", namespace, err)
}
// Accumulate the pods
allPods = append(allPods, pods.Items...)
}
return allPods, nil
}
// listFilesInPodDir lists all files in the specified directory inside the pod across multiple namespaces
func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName string, namespaces []string, configMapName, configMapKey string, cutoffTime *time.Time) ([]NamespaceFiles, error) {
var namespaceFilesList []NamespaceFiles
for _, namespace := range namespaces {
// Attempt to get the ConfigMap in the current namespace
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
continue
}
// Check if the source directory exists in the ConfigMap
srcDir, ok := configMap.Data[configMapKey]
if !ok || srcDir == "" {
log.Error().Msgf("source directory not found in ConfigMap %s in namespace %s", configMapName, namespace)
continue
}
// Attempt to get the pod in the current namespace
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
log.Error().Err(err).Msgf("failed to get pod %s in namespace %s", podName, namespace)
continue
}
nodeName := pod.Spec.NodeName
srcFilePath := filepath.Join("data", nodeName, srcDir)
cmd := []string{"ls", srcFilePath}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
log.Error().Err(err).Msgf("failed to initialize executor for pod %s in namespace %s", podName, namespace)
continue
}
var stdoutBuf bytes.Buffer
var stderrBuf bytes.Buffer
// Execute the command to list files
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
log.Error().Err(err).Msgf("error listing files in pod %s in namespace %s: %s", podName, namespace, stderrBuf.String())
continue
}
// Split the output (file names) into a list
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
if len(files) == 0 {
log.Info().Msgf("No files found in directory %s in pod %s", srcFilePath, podName)
continue
}
var filteredFiles []string
// Filter files based on cutoff time if provided
for _, file := range files {
if cutoffTime != nil {
parts := strings.Split(file, "-")
if len(parts) < 2 {
log.Warn().Msgf("Skipping file with invalid format: %s", file)
continue
}
timestampStr := parts[len(parts)-2] + parts[len(parts)-1][:6] // Extract YYYYMMDDHHMMSS
fileTime, err := time.Parse("20060102150405", timestampStr)
if err != nil {
log.Warn().Err(err).Msgf("Skipping file with unparsable timestamp: %s", file)
continue
}
if fileTime.Before(*cutoffTime) {
continue
}
}
// Add file to filtered list
filteredFiles = append(filteredFiles, file)
}
if len(filteredFiles) > 0 {
namespaceFilesList = append(namespaceFilesList, NamespaceFiles{
Namespace: namespace,
SrcDir: srcDir,
Files: filteredFiles,
})
}
}
if len(namespaceFilesList) == 0 {
return nil, fmt.Errorf("no files found in pod %s across the provided namespaces", podName)
}
return namespaceFilesList, nil
}
// copyFileFromPod copies a single file from a pod to a local destination
func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcDir, srcFile, destFile string) error {
// Get the pod to retrieve its node name
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod %s in namespace %s: %w", podName, namespace, err)
}
// Construct the complete path using /data, the node name, srcDir, and srcFile
nodeName := pod.Spec.NodeName
srcFilePath := filepath.Join("data", nodeName, srcDir, srcFile)
// Execute the `cat` command to read the file at the srcFilePath
cmd := []string{"cat", srcFilePath}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor for pod %s in namespace %s: %w", podName, namespace, err)
}
// Create the local file to write the content to
outFile, err := os.Create(destFile)
if err != nil {
return fmt.Errorf("failed to create destination file: %w", err)
}
defer outFile.Close()
// Capture stderr for error logging
var stderrBuf bytes.Buffer
// Stream the file content from the pod to the local file
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: outFile,
Stderr: &stderrBuf,
})
if err != nil {
return fmt.Errorf("error copying file from pod %s in namespace %s: %s", podName, namespace, stderrBuf.String())
}
return nil
}
func mergePCAPs(outputFile string, inputFiles []string) error {
// Create the output file
f, err := os.Create(outputFile)
if err != nil {
return err
}
defer f.Close()
// Create a pcap writer for the output file
writer := pcapgo.NewWriter(f)
err = writer.WriteFileHeader(65536, layers.LinkTypeEthernet) // Snapshot length and LinkType
if err != nil {
return err
}
for _, inputFile := range inputFiles {
log.Info().Msgf("Merging %s int %s", inputFile, outputFile)
// Open each input file
file, err := os.Open(inputFile)
if err != nil {
log.Error().Err(err).Msgf("Failed to open %v", inputFile)
continue
}
defer file.Close()
reader, err := pcapgo.NewReader(file)
if err != nil {
log.Error().Err(err).Msgf("Failed to create pcapng reader for %v", file.Name())
continue
}
// Create the packet source
packetSource := gopacket.NewPacketSource(reader, layers.LinkTypeEthernet)
for packet := range packetSource.Packets() {
err := writer.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
if err != nil {
log.Error().Err(err).Msgf("Failed to write packet to %v", outputFile)
continue
}
}
}
return nil
}
// copyPcapFiles function for copying the PCAP files from the worker pods
func copyPcapFiles(clientset *kubernetes.Clientset, config *rest.Config, destDir string, cutoffTime *time.Time) error {
kubernetesProvider, err := getKubernetesProviderForCli(false, false)
if err != nil {
log.Error().Err(err).Send()
return err
}
targetNamespaces := kubernetesProvider.GetNamespaces()
// List worker pods
workerPods, err := listWorkerPods(context.Background(), clientset, targetNamespaces)
if err != nil {
log.Error().Err(err).Msg("Error listing worker pods")
return err
}
var currentFiles []string
// Iterate over each pod to get the PCAP directory from config and copy files
for _, pod := range workerPods {
// Get the list of NamespaceFiles (files per namespace) and their source directories
namespaceFiles, err := listFilesInPodDir(context.Background(), clientset, config, pod.Name, targetNamespaces, SELF_RESOURCES_PREFIX+SUFFIX_CONFIG_MAP, "PCAP_SRC_DIR", cutoffTime)
if err != nil {
log.Error().Err(err).Msgf("Error listing files in pod %s", pod.Name)
continue
}
// Copy each file from the pod to the local destination for each namespace
for _, nsFiles := range namespaceFiles {
for _, file := range nsFiles.Files {
destFile := filepath.Join(destDir, file)
// Pass the correct namespace and related details to the function
err = copyFileFromPod(context.Background(), clientset, config, pod.Name, nsFiles.Namespace, nsFiles.SrcDir, file, destFile)
if err != nil {
log.Error().Err(err).Msgf("Error copying file from pod %s in namespace %s", pod.Name, nsFiles.Namespace)
} else {
log.Info().Msgf("Copied %s from %s to %s", file, pod.Name, destFile)
}
currentFiles = append(currentFiles, destFile)
}
}
}
if len(currentFiles) == 0 {
log.Error().Msgf("No files to merge")
return nil
// continue
}
// Generate a temporary filename based on the first file
tempMergedFile := currentFiles[0] + "_temp"
// Merge the PCAPs into the temporary file
err = mergePCAPs(tempMergedFile, currentFiles)
if err != nil {
log.Error().Err(err).Msgf("Error merging files")
return err
// continue
}
// Remove the original files after merging
for _, file := range currentFiles {
err := os.Remove(file)
if err != nil {
log.Error().Err(err).Msgf("Error removing file %s", file)
}
}
// Rename the temp file to the final name (removing "_temp")
finalMergedFile := strings.TrimSuffix(tempMergedFile, "_temp")
err = os.Rename(tempMergedFile, finalMergedFile)
if err != nil {
log.Error().Err(err).Msgf("Error renaming merged file %s", tempMergedFile)
// continue
return err
}
log.Info().Msgf("Merged file created: %s", finalMergedFile)
return nil
}