Add cmd to copy pcaps from worker

This commit is contained in:
bogdan.balan1 2024-09-27 18:34:05 +03:00
parent 32caeb37e4
commit 1720f19b45
7 changed files with 565 additions and 2 deletions

100
cmd/pcapDumpCopy.go Normal file
View File

@ -0,0 +1,100 @@
package cmd
import (
"context"
"path/filepath"
"github.com/creasty/defaults"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
const (
configPath = "/app/config/pcap_config.txt"
namespace = "default"
)
// pcapCopyCmd represents the pcapcopy command
var pcapCopyCmd = &cobra.Command{
Use: "pcapcopy",
Short: "Copy PCAP files from worker pods to the local destination",
RunE: func(cmd *cobra.Command, args []string) error {
// Create Kubernetes client
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Error().Err(err).Msg("Error building kubeconfig")
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error().Err(err).Msg("Error creating Kubernetes client")
return err
}
// List worker pods
workerPods, err := listWorkerPods(context.Background(), clientset, namespace)
if err != nil {
log.Error().Err(err).Msg("Error listing worker pods")
return err
}
// Destination directory for the files
destDir, _ := cmd.Flags().GetString(configStructs.PcapDumpCopy)
// Iterate over each pod to get the PCAP directory from config and copy files
for _, pod := range workerPods.Items {
// Read the config file from the pod to get the PCAP_DIR value
configMap, err := readConfigFileFromPod(context.Background(), clientset, config, pod.Name, namespace, configPath)
if err != nil {
log.Error().Err(err).Msgf("Error reading config file from pod %s", pod.Name)
continue
}
// Use the PCAP_DIR value from the config file
srcDir := configMap["PCAP_DIR"]
if srcDir == "" {
log.Error().Msgf("PCAP_DIR not found in config for pod %s", pod.Name)
continue
}
// List files in the PCAP directory on the pod
files, err := listFilesInPodDir(context.Background(), clientset, config, pod.Name, namespace, srcDir)
if err != nil {
log.Error().Err(err).Msgf("Error listing files in pod %s", pod.Name)
continue
}
// Copy each file from the pod to the local destination
for _, file := range files {
srcFile := filepath.Join(srcDir, file)
destFile := filepath.Join(destDir, pod.Name+"_"+file)
err = copyFileFromPod(context.Background(), clientset, config, pod.Name, namespace, srcFile, destFile)
if err != nil {
log.Error().Err(err).Msgf("Error copying file from pod %s", pod.Name)
}
}
}
return nil
},
}
func init() {
rootCmd.AddCommand(pcapCopyCmd)
defaultTapConfig := configStructs.TapConfig{}
if err := defaults.Set(&defaultTapConfig); err != nil {
log.Debug().Err(err).Send()
}
// Use full flag name without shorthand
pcapCopyCmd.Flags().String(configStructs.PcapDumpCopy, defaultTapConfig.Misc.PcapDest, "Local destination path for the copied files (required)")
_ = pcapCopyCmd.MarkFlagRequired("dest")
}

259
cmd/pcapDumpRunner.go Normal file
View File

@ -0,0 +1,259 @@
package cmd
import (
"bytes"
"context"
"fmt"
"os"
"strings"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
const label = "app.kubeshark.co/app=worker"
// listWorkerPods fetches all the worker pods using the Kubernetes client
func listWorkerPods(ctx context.Context, clientset *kubernetes.Clientset, namespace string) (*corev1.PodList, error) {
labelSelector := label
// List all pods matching the label
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, fmt.Errorf("failed to list worker pods: %w", err)
}
return pods, nil
}
// listFilesInPodDir lists all files in the specified directory inside the pod
func listFilesInPodDir(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcDir string) ([]string, error) {
cmd := []string{"ls", srcDir}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("failed to initialize executor: %w", err)
}
// Buffer to capture stdout (file listing)
var stdoutBuf bytes.Buffer
var stderrBuf bytes.Buffer
// Stream the result of the ls command
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf, // Capture stderr for better debugging
})
if err != nil {
return nil, fmt.Errorf("error listing files in pod: %w. Stderr: %s", err, stderrBuf.String())
}
// Split the output (file names) into a list
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
return files, nil
}
// copyFileFromPod copies a single file from a pod to a local destination
func copyFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, srcFile, destFile string) error {
cmd := []string{"cat", srcFile}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor: %w", err)
}
// Create a local file to write the content to
outFile, err := os.Create(destFile)
if err != nil {
return fmt.Errorf("failed to create destination file: %w", err)
}
defer outFile.Close()
// Capture stderr for error logging
var stderrBuf bytes.Buffer
// Stream the file content from the pod to the local file
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: outFile,
Stderr: &stderrBuf, // Capture stderr for better debugging
})
if err != nil {
return fmt.Errorf("error copying file from pod: %w. Stderr: %s", err, stderrBuf.String())
}
fmt.Printf("File from pod %s copied to local destination: %s\n", podName, destFile)
return nil
}
// updatePodEnvVars updates the configuration file inside the worker pod
func updatePodEnvVars(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace string, stop bool, timeInterval, maxTime, maxSize string) error {
var envVars []string
if stop {
envVars = append(envVars, "PCAP_DUMP_ENABLE=false")
} else {
envVars = append(envVars, "PCAP_DUMP_ENABLE=true")
if timeInterval != "" {
envVars = append(envVars, fmt.Sprintf("TIME_INTERVAL=%s", timeInterval))
}
if maxTime != "" {
envVars = append(envVars, fmt.Sprintf("MAX_TIME=%s", maxTime))
}
if maxSize != "" {
envVars = append(envVars, fmt.Sprintf("MAX_SIZE=%s", maxSize))
}
}
// Create a command that sets the environment variables directly in the pod
for _, envVar := range envVars {
cmd := []string{"sh", "-c", fmt.Sprintf("export %s", envVar)}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer"). // Assuming container is called 'sniffer'
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1]).
Param("command", cmd[2])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor for pod %s: %w", podName, err)
}
var stdoutBuf, stderrBuf bytes.Buffer
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return fmt.Errorf("failed to update env vars in pod %s: %w. Stderr: %s", podName, err, stderrBuf.String())
}
}
fmt.Printf("Updated env vars for pod %s\n", podName)
return nil
}
// readConfigFileFromPod reads the configuration file from the pod
func readConfigFileFromPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, configFilePath string) (map[string]string, error) {
cmd := []string{"cat", configFilePath}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("failed to initialize executor for pod %s: %w", podName, err)
}
var stdoutBuf, stderrBuf bytes.Buffer
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return nil, fmt.Errorf("failed to read config file from pod %s: %w. Stderr: %s", podName, err, stderrBuf.String())
}
// Parse the config content into a map of key-value pairs
configMap := parseConfigContent(stdoutBuf.String())
return configMap, nil
}
// writeConfigFileToPod writes the updated configuration map to the file in the pod
func writeConfigFileToPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, configFilePath string, configMap map[string]string) error {
// Convert the config map back to a string format for writing
configContent := formatConfigMapToString(configMap)
// Escape any single quotes in the config content to avoid issues in the shell command
escapedConfigContent := strings.ReplaceAll(configContent, "'", "'\\''")
// Prepare the command to write the configuration to the file
cmd := []string{"sh", "-c", fmt.Sprintf("echo '%s' > %s", escapedConfigContent, configFilePath)}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1]).
Param("command", cmd[2])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor for pod %s: %w", podName, err)
}
var stdoutBuf, stderrBuf bytes.Buffer
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf,
})
if err != nil {
return fmt.Errorf("failed to write config file to pod %s: %w. Stderr: %s", podName, err, stderrBuf.String())
}
return nil
}
// parseConfigContent parses the content of the config file into a map of key-value pairs
func parseConfigContent(content string) map[string]string {
configMap := make(map[string]string)
lines := strings.Split(content, "\n")
for _, line := range lines {
if strings.TrimSpace(line) == "" || !strings.Contains(line, "=") {
continue
}
parts := strings.SplitN(line, "=", 2)
configMap[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
}
return configMap
}
// formatConfigMapToString converts the config map back to string format
func formatConfigMapToString(configMap map[string]string) string {
var sb strings.Builder
for key, value := range configMap {
sb.WriteString(fmt.Sprintf("%s=%s\n", key, value))
}
return sb.String()
}

101
cmd/pcapDumpStart.go Normal file
View File

@ -0,0 +1,101 @@
package cmd
import (
"context"
"fmt"
"path/filepath"
"github.com/creasty/defaults"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
// pcapCmd represents the pcapstart command
var pcapStartCmd = &cobra.Command{
Use: "pcapstart",
Short: "Start capturing traffic and generate a PCAP file",
RunE: func(cmd *cobra.Command, args []string) error {
// Create Kubernetes client
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Error().Err(err).Msg("Error building kubeconfig")
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error().Err(err).Msg("Error creating Kubernetes client")
return err
}
// List worker pods
workerPods, err := listWorkerPods(context.Background(), clientset, namespace)
if err != nil {
log.Error().Err(err).Msg("Error listing worker pods")
return err
}
// Read the flags for time interval, max time, and max size
timeInterval, _ := cmd.Flags().GetString("time-interval")
maxTime, _ := cmd.Flags().GetString("max-time")
maxSize, _ := cmd.Flags().GetString("max-size")
// Iterate over each pod to start the PCAP capture by updating the config file
for _, pod := range workerPods.Items {
err := writeStartConfigToFileInPod(context.Background(), clientset, config, pod.Name, namespace, timeInterval, maxTime, maxSize)
if err != nil {
log.Error().Err(err).Msgf("Error updating config file for pod %s", pod.Name)
continue
}
fmt.Printf("PCAP capture started for pod %s\n", pod.Name)
}
return nil
},
}
func init() {
rootCmd.AddCommand(pcapStartCmd)
defaultTapConfig := configStructs.TapConfig{}
if err := defaults.Set(&defaultTapConfig); err != nil {
log.Debug().Err(err).Send()
}
// Use full flag name without shorthand
pcapStartCmd.Flags().String("time-interval", defaultTapConfig.Misc.TimeInterval, "Time interval for PCAP file rotation (e.g., 1m, 2h)")
pcapStartCmd.Flags().String("max-time", defaultTapConfig.Misc.MaxTime, "Maximum time for retaining old PCAP files (e.g., 24h)")
pcapStartCmd.Flags().String("max-size", defaultTapConfig.Misc.MaxSize, "Maximum size of PCAP files before deletion (e.g., 500MB, 10GB)")
}
// writeStartConfigToFileInPod writes config to start pcap in the worker pods
func writeStartConfigToFileInPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace, timeInterval, maxTime, maxSize string) error {
existingConfig, err := readConfigFileFromPod(ctx, clientset, config, podName, namespace, configPath)
if err != nil {
return fmt.Errorf("failed to read config file from pod %s: %w", podName, err)
}
existingConfig["PCAP_DUMP_ENABLE"] = "true"
if timeInterval != "" {
existingConfig["TIME_INTERVAL"] = timeInterval
}
if maxTime != "" {
existingConfig["MAX_TIME"] = maxTime
}
if maxSize != "" {
existingConfig["MAX_SIZE"] = maxSize
}
err = writeConfigFileToPod(ctx, clientset, config, podName, namespace, configPath, existingConfig)
if err != nil {
return fmt.Errorf("failed to write config file to pod %s: %w", podName, err)
}
return nil
}

83
cmd/pcapDumpStop.go Normal file
View File

@ -0,0 +1,83 @@
package cmd
import (
"context"
"fmt"
"path/filepath"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
// pcapstopCmd represents the pcapstop command
var pcapStopCmd = &cobra.Command{
Use: "pcapstop",
Short: "Stop capturing traffic and close the PCAP dump",
RunE: func(cmd *cobra.Command, args []string) error {
// Call the function to stop PCAP capture
return stopPcap(cmd)
},
}
func init() {
rootCmd.AddCommand(pcapStopCmd)
}
func stopPcap(cmd *cobra.Command) error {
fmt.Println("Stopping PCAP capture.")
// Load Kubernetes configuration
kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Error().Err(err).Msg("Error building kubeconfig")
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error().Err(err).Msg("Error creating Kubernetes client")
return err
}
// Get the list of worker pods
workerPods, err := listWorkerPods(context.Background(), clientset, namespace)
if err != nil {
log.Error().Err(err).Msg("Error listing worker pods")
return err
}
// Iterate over the worker pods and set config to stop pcap
for _, pod := range workerPods.Items {
err := writeStopConfigToFileInPod(context.Background(), clientset, config, pod.Name, namespace)
if err != nil {
log.Error().Err(err).Msgf("Error updating config file for pod %s", pod.Name)
continue
}
fmt.Printf("PCAP capture stopped for pod %s\n", pod.Name)
}
fmt.Println("PCAP capture stopped successfully.")
return nil
}
// writeStopConfigToFileInPod reads the existing config, updates the PCAP_DUMP_ENABLE value, and writes it back to the file
func writeStopConfigToFileInPod(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, namespace string) error {
existingConfig, err := readConfigFileFromPod(ctx, clientset, config, podName, namespace, configPath)
if err != nil {
return fmt.Errorf("failed to read config file from pod %s: %w", podName, err)
}
existingConfig["PCAP_DUMP_ENABLE"] = "false"
err = writeConfigFileToPod(ctx, clientset, config, podName, namespace, configPath, existingConfig)
if err != nil {
return fmt.Errorf("failed to write config file to pod %s: %w", podName, err)
}
return nil
}

View File

@ -34,6 +34,7 @@ const (
DebugLabel = "debug"
ContainerPort = 80
ContainerPortStr = "80"
PcapDumpCopy = "dest"
)
type ResourceLimitsHub struct {
@ -169,6 +170,11 @@ type MiscConfig struct {
Profile bool `yaml:"profile" json:"profile" default:"false"`
DuplicateTimeframe string `yaml:"duplicateTimeframe" json:"duplicateTimeframe" default:"200ms"`
DetectDuplicates bool `yaml:"detectDuplicates" json:"detectDuplicates" default:"false"`
PcapDumpEnable bool `yaml:"pcapDumpEnable" json:"pcapDumpEnable" default:"false"`
TimeInterval string `yaml:"timeInterval" json:"timeInterval" default:"1m"`
MaxTime string `yaml:"maxTime" json:"maxTime" default:"24h"`
MaxSize string `yaml:"maxSize" json:"maxSize" default:"500MB"`
PcapDest string `yaml:"pcapDest" json:"pcapDest" default:"."`
}
type TapConfig struct {

View File

@ -71,9 +71,9 @@ spec:
- -debug
{{- end }}
{{- if .Values.tap.docker.overrideTag.worker }}
image: '{{ .Values.tap.docker.registry }}/worker:{{ .Values.tap.docker.overrideTag.worker }}{{ include "kubeshark.dockerTagDebugVersion" . }}'
image: '192.168.49.2:5000/kubeshark/worker:dev'
{{ else }}
image: '{{ .Values.tap.docker.registry }}/worker:{{ not (eq .Values.tap.docker.tag "") | ternary .Values.tap.docker.tag (printf "v%s" .Chart.Version) }}{{ include "kubeshark.dockerTagDebugVersion" . }}'
image: '192.168.49.2:5000/kubeshark/worker:dev'
{{- end }}
imagePullPolicy: {{ .Values.tap.docker.imagePullPolicy }}
name: sniffer
@ -98,6 +98,14 @@ spec:
value: 'https://api.kubeshark.co'
- name: PROFILING_ENABLED
value: '{{ .Values.tap.misc.profile }}'
- name: PCAP_DUMP_ENABLE
value: '{{ .Values.tap.worker.env.pcapDumpEnable }}'
- name: TIME_INTERVAL
value: '{{ .Values.tap.worker.env.timeInterval }}'
- name: MAX_TIME
value: '{{ .Values.tap.worker.env.maxTime }}'
- name: MAX_SIZE
value: '{{ .Values.tap.worker.env.maxSize }}'
resources:
limits:
cpu: {{ .Values.tap.resources.sniffer.limits.cpu }}

View File

@ -140,6 +140,12 @@ tap:
profile: false
duplicateTimeframe: 200ms
detectDuplicates: false
worker:
env:
pcapDumpEnable: true
timeInterval: 1m
maxTime: 2h
maxSize: 10GB
logs:
file: ""
grep: ""