mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-02 21:21:02 +00:00
* Add --time param to pcapdump * Update description * Remove obsolete code * Revert config change * Add time to pcap config --------- Co-authored-by: bogdan.balan1 <bogdanvalentin.balan@1nce.com> Co-authored-by: Alon Girmonsky <1990761+alongir@users.noreply.github.com>
346 lines
10 KiB
Go
346 lines
10 KiB
Go
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/kubeshark/gopacket"
|
|
"github.com/kubeshark/gopacket/layers"
|
|
"github.com/kubeshark/gopacket/pcapgo"
|
|
"github.com/rs/zerolog/log"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
clientk8s "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/remotecommand"
|
|
)
|
|
|
|
const label = "app.kubeshark.co/app=worker"
|
|
const SELF_RESOURCES_PREFIX = "kubeshark-"
|
|
const SUFFIX_CONFIG_MAP = "config-map"
|
|
|
|
// NamespaceFiles represents the namespace and the files found in that namespace.
|
|
type NamespaceFiles struct {
|
|
Namespace string // The namespace in which the files were found
|
|
SrcDir string // The source directory from which the files were listed
|
|
Files []string // List of files found in the namespace
|
|
}
|
|
|
|
// listWorkerPods fetches all worker pods from multiple namespaces
|
|
func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespaces []string) ([]corev1.Pod, error) {
|
|
var allPods []corev1.Pod
|
|
labelSelector := label
|
|
|
|
for _, namespace := range namespaces {
|
|
// List all pods matching the label in the current namespace
|
|
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
|
|
LabelSelector: labelSelector,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list worker pods in namespace %s: %w", namespace, err)
|
|
}
|
|
|
|
// Accumulate the pods
|
|
allPods = append(allPods, pods.Items...)
|
|
}
|
|
|
|
return allPods, nil
|
|
}
|
|
|
|
// listFilesInPodDir lists all files in the specified directory inside the pod across multiple namespaces
|
|
func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName string, namespaces []string, configMapName, configMapKey string, cutoffTime *time.Time) ([]NamespaceFiles, error) {
|
|
var namespaceFilesList []NamespaceFiles
|
|
|
|
for _, namespace := range namespaces {
|
|
// Attempt to get the ConfigMap in the current namespace
|
|
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Check if the source directory exists in the ConfigMap
|
|
srcDir, ok := configMap.Data[configMapKey]
|
|
if !ok || srcDir == "" {
|
|
log.Error().Msgf("source directory not found in ConfigMap %s in namespace %s", configMapName, namespace)
|
|
continue
|
|
}
|
|
|
|
// Attempt to get the pod in the current namespace
|
|
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("failed to get pod %s in namespace %s", podName, namespace)
|
|
continue
|
|
}
|
|
|
|
nodeName := pod.Spec.NodeName
|
|
srcFilePath := filepath.Join("data", nodeName, srcDir)
|
|
|
|
cmd := []string{"ls", srcFilePath}
|
|
req := clientset.CoreV1().RESTClient().Post().
|
|
Resource("pods").
|
|
Name(podName).
|
|
Namespace(namespace).
|
|
SubResource("exec").
|
|
Param("container", "sniffer").
|
|
Param("stdout", "true").
|
|
Param("stderr", "true").
|
|
Param("command", cmd[0]).
|
|
Param("command", cmd[1])
|
|
|
|
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("failed to initialize executor for pod %s in namespace %s", podName, namespace)
|
|
continue
|
|
}
|
|
|
|
var stdoutBuf bytes.Buffer
|
|
var stderrBuf bytes.Buffer
|
|
|
|
// Execute the command to list files
|
|
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
|
|
Stdout: &stdoutBuf,
|
|
Stderr: &stderrBuf,
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("error listing files in pod %s in namespace %s: %s", podName, namespace, stderrBuf.String())
|
|
continue
|
|
}
|
|
|
|
// Split the output (file names) into a list
|
|
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
|
|
if len(files) == 0 {
|
|
log.Info().Msgf("No files found in directory %s in pod %s", srcFilePath, podName)
|
|
continue
|
|
}
|
|
|
|
var filteredFiles []string
|
|
|
|
// Filter files based on cutoff time if provided
|
|
for _, file := range files {
|
|
if cutoffTime != nil {
|
|
parts := strings.Split(file, "-")
|
|
if len(parts) < 2 {
|
|
log.Warn().Msgf("Skipping file with invalid format: %s", file)
|
|
continue
|
|
}
|
|
|
|
timestampStr := parts[len(parts)-2] + parts[len(parts)-1][:6] // Extract YYYYMMDDHHMMSS
|
|
fileTime, err := time.Parse("20060102150405", timestampStr)
|
|
if err != nil {
|
|
log.Warn().Err(err).Msgf("Skipping file with unparsable timestamp: %s", file)
|
|
continue
|
|
}
|
|
|
|
if fileTime.Before(*cutoffTime) {
|
|
continue
|
|
}
|
|
}
|
|
// Add file to filtered list
|
|
filteredFiles = append(filteredFiles, file)
|
|
}
|
|
|
|
if len(filteredFiles) > 0 {
|
|
namespaceFilesList = append(namespaceFilesList, NamespaceFiles{
|
|
Namespace: namespace,
|
|
SrcDir: srcDir,
|
|
Files: filteredFiles,
|
|
})
|
|
}
|
|
}
|
|
|
|
if len(namespaceFilesList) == 0 {
|
|
return nil, fmt.Errorf("no files found in pod %s across the provided namespaces", podName)
|
|
}
|
|
|
|
return namespaceFilesList, nil
|
|
}
|
|
|
|
// copyFileFromPod copies a single file from a pod to a local destination
|
|
func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcDir, srcFile, destFile string) error {
|
|
// Get the pod to retrieve its node name
|
|
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get pod %s in namespace %s: %w", podName, namespace, err)
|
|
}
|
|
|
|
// Construct the complete path using /data, the node name, srcDir, and srcFile
|
|
nodeName := pod.Spec.NodeName
|
|
srcFilePath := filepath.Join("data", nodeName, srcDir, srcFile)
|
|
|
|
// Execute the `cat` command to read the file at the srcFilePath
|
|
cmd := []string{"cat", srcFilePath}
|
|
req := clientset.CoreV1().RESTClient().Post().
|
|
Resource("pods").
|
|
Name(podName).
|
|
Namespace(namespace).
|
|
SubResource("exec").
|
|
Param("container", "sniffer").
|
|
Param("stdout", "true").
|
|
Param("stderr", "true").
|
|
Param("command", cmd[0]).
|
|
Param("command", cmd[1])
|
|
|
|
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize executor for pod %s in namespace %s: %w", podName, namespace, err)
|
|
}
|
|
|
|
// Create the local file to write the content to
|
|
outFile, err := os.Create(destFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create destination file: %w", err)
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Capture stderr for error logging
|
|
var stderrBuf bytes.Buffer
|
|
|
|
// Stream the file content from the pod to the local file
|
|
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
|
|
Stdout: outFile,
|
|
Stderr: &stderrBuf,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error copying file from pod %s in namespace %s: %s", podName, namespace, stderrBuf.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func mergePCAPs(outputFile string, inputFiles []string) error {
|
|
// Create the output file
|
|
f, err := os.Create(outputFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Create a pcap writer for the output file
|
|
writer := pcapgo.NewWriter(f)
|
|
err = writer.WriteFileHeader(65536, layers.LinkTypeEthernet) // Snapshot length and LinkType
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, inputFile := range inputFiles {
|
|
log.Info().Msgf("Merging %s int %s", inputFile, outputFile)
|
|
// Open each input file
|
|
file, err := os.Open(inputFile)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Failed to open %v", inputFile)
|
|
continue
|
|
}
|
|
defer file.Close()
|
|
|
|
reader, err := pcapgo.NewReader(file)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Failed to create pcapng reader for %v", file.Name())
|
|
continue
|
|
}
|
|
|
|
// Create the packet source
|
|
packetSource := gopacket.NewPacketSource(reader, layers.LinkTypeEthernet)
|
|
|
|
for packet := range packetSource.Packets() {
|
|
err := writer.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Failed to write packet to %v", outputFile)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// copyPcapFiles function for copying the PCAP files from the worker pods
|
|
func copyPcapFiles(clientset *kubernetes.Clientset, config *rest.Config, destDir string, cutoffTime *time.Time) error {
|
|
kubernetesProvider, err := getKubernetesProviderForCli(false, false)
|
|
if err != nil {
|
|
log.Error().Err(err).Send()
|
|
return err
|
|
}
|
|
|
|
targetNamespaces := kubernetesProvider.GetNamespaces()
|
|
|
|
// List worker pods
|
|
workerPods, err := listWorkerPods(context.Background(), clientset, targetNamespaces)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Error listing worker pods")
|
|
return err
|
|
}
|
|
var currentFiles []string
|
|
|
|
// Iterate over each pod to get the PCAP directory from config and copy files
|
|
for _, pod := range workerPods {
|
|
// Get the list of NamespaceFiles (files per namespace) and their source directories
|
|
namespaceFiles, err := listFilesInPodDir(context.Background(), clientset, config, pod.Name, targetNamespaces, SELF_RESOURCES_PREFIX+SUFFIX_CONFIG_MAP, "PCAP_SRC_DIR", cutoffTime)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Error listing files in pod %s", pod.Name)
|
|
continue
|
|
}
|
|
|
|
// Copy each file from the pod to the local destination for each namespace
|
|
for _, nsFiles := range namespaceFiles {
|
|
for _, file := range nsFiles.Files {
|
|
destFile := filepath.Join(destDir, file)
|
|
|
|
// Pass the correct namespace and related details to the function
|
|
err = copyFileFromPod(context.Background(), clientset, config, pod.Name, nsFiles.Namespace, nsFiles.SrcDir, file, destFile)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Error copying file from pod %s in namespace %s", pod.Name, nsFiles.Namespace)
|
|
} else {
|
|
log.Info().Msgf("Copied %s from %s to %s", file, pod.Name, destFile)
|
|
}
|
|
|
|
currentFiles = append(currentFiles, destFile)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
if len(currentFiles) == 0 {
|
|
log.Error().Msgf("No files to merge")
|
|
return nil
|
|
// continue
|
|
}
|
|
|
|
// Generate a temporary filename based on the first file
|
|
tempMergedFile := currentFiles[0] + "_temp"
|
|
|
|
// Merge the PCAPs into the temporary file
|
|
err = mergePCAPs(tempMergedFile, currentFiles)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Error merging files")
|
|
return err
|
|
// continue
|
|
}
|
|
|
|
// Remove the original files after merging
|
|
for _, file := range currentFiles {
|
|
err := os.Remove(file)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Error removing file %s", file)
|
|
}
|
|
}
|
|
|
|
// Rename the temp file to the final name (removing "_temp")
|
|
finalMergedFile := strings.TrimSuffix(tempMergedFile, "_temp")
|
|
err = os.Rename(tempMergedFile, finalMergedFile)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Error renaming merged file %s", tempMergedFile)
|
|
// continue
|
|
return err
|
|
}
|
|
|
|
log.Info().Msgf("Merged file created: %s", finalMergedFile)
|
|
|
|
return nil
|
|
}
|