kubeshark/cmd/pcapDumpRunner.go
2024-09-30 08:00:16 +03:00

240 lines
7.0 KiB
Go

package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/kubeshark/gopacket"
"github.com/kubeshark/gopacket/layers"
"github.com/kubeshark/gopacket/pcapgo"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientk8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
const label = "app.kubeshark.co/app=worker"
// listWorkerPods fetches all the worker pods using the Kubernetes client
func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespace string) (*corev1.PodList, error) {
labelSelector := label
// List all pods matching the label
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, fmt.Errorf("failed to list worker pods: %w", err)
}
return pods, nil
}
// listFilesInPodDir lists all files in the specified directory inside the pod
func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName, namespace, srcDir string) ([]string, error) {
cmd := []string{"ls", srcDir}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return nil, fmt.Errorf("failed to initialize executor: %w", err)
}
// Buffer to capture stdout (file listing)
var stdoutBuf bytes.Buffer
var stderrBuf bytes.Buffer
// Stream the result of the ls command
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdoutBuf,
Stderr: &stderrBuf, // Capture stderr for better debugging
})
if err != nil {
return nil, fmt.Errorf("error listing files in pod: %w. Stderr: %s", err, stderrBuf.String())
}
// Split the output (file names) into a list
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
return files, nil
}
// copyFileFromPod copies a single file from a pod to a local destination
func copyFileFromPod(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName, namespace, srcFile, destFile string) error {
cmd := []string{"cat", srcFile}
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", "sniffer").
Param("stdout", "true").
Param("stderr", "true").
Param("command", cmd[0]).
Param("command", cmd[1])
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("failed to initialize executor: %w", err)
}
// Create a local file to write the content to
outFile, err := os.Create(destFile)
if err != nil {
return fmt.Errorf("failed to create destination file: %w", err)
}
defer outFile.Close()
// Capture stderr for error logging
var stderrBuf bytes.Buffer
// Stream the file content from the pod to the local file
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: outFile,
Stderr: &stderrBuf, // Capture stderr for better debugging
})
if err != nil {
return fmt.Errorf("error copying file from pod: %w. Stderr: %s", err, stderrBuf.String())
}
return nil
}
func mergePCAPs(outputFile string, inputFiles []string) error {
// Create the output file
f, err := os.Create(outputFile)
if err != nil {
return err
}
defer f.Close()
// Create a pcap writer for the output file
writer := pcapgo.NewWriter(f)
err = writer.WriteFileHeader(65536, layers.LinkTypeEthernet) // Snapshot length and LinkType
if err != nil {
return err
}
for _, inputFile := range inputFiles {
// Open each input file
file, err := os.Open(inputFile)
if err != nil {
return err
}
defer file.Close()
reader, err := pcapgo.NewReader(file)
if err != nil {
log.Error().Err(err).Msgf("Failed to create pcapng reader for %v", file.Name())
return err
}
// Create the packet source
packetSource := gopacket.NewPacketSource(reader, layers.LinkTypeEthernet)
for packet := range packetSource.Packets() {
err := writer.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
if err != nil {
return err
}
}
}
return nil
}
func setPcapConfigInKubernetes(clientset *clientk8s.Clientset, podName, namespace, enabledPcap, timeInterval, maxTime, maxSize string) error {
// Load the existing ConfigMap
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), "kubeshark-config-map", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to load ConfigMap: %w", err)
}
// Update the values with user-provided input
if len(configMap.Data["PCAP_TIME_INTERVAL"]) > 0 {
configMap.Data["PCAP_TIME_INTERVAL"] = timeInterval
}
if len(configMap.Data["PCAP_MAX_SIZE"]) > 0 {
configMap.Data["PCAP_MAX_SIZE"] = maxSize
}
if len(configMap.Data["PCAP_MAX_TIME"]) > 0 {
configMap.Data["PCAP_MAX_TIME"] = maxTime
}
if len(configMap.Data["PCAP_DUMP_ENABLE"]) > 0 {
configMap.Data["PCAP_DUMP_ENABLE"] = enabledPcap
}
// Apply the updated ConfigMap back to the cluster
_, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update ConfigMap: %w", err)
}
return nil
}
// WorkerSrcResponse represents the response structure from the worker's /pcaps/worker-src endpoint.
type WorkerSrcResponse struct {
WorkerSrcDir string `json:"workerSrcDir"`
}
// getWorkerSource fetches the worker source directory from the worker pod via the /pcaps/worker-src endpoint.
func getWorkerSource(clientset *kubernetes.Clientset, podName, namespace string) (string, error) {
// Get the worker pod IP or service address (you can also use the cluster DNS name)
pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
log.Error().Err(err).Msgf("Failed to get pod %s", podName)
return "", err
}
// Construct the URL to access the worker's /pcaps/worker-src endpoint
workerURL := fmt.Sprintf("http://%s:30001/pcaps/worker-src", pod.Status.PodIP)
// Make an HTTP request to the worker pod's endpoint
resp, err := http.Get(workerURL)
if err != nil {
log.Error().Err(err).Msgf("Failed to request worker src dir from %s", workerURL)
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get worker src dir, status code: %d", resp.StatusCode)
}
// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
// Parse the JSON response
var workerSrcResp WorkerSrcResponse
err = json.Unmarshal(body, &workerSrcResp)
if err != nil {
return "", fmt.Errorf("failed to parse worker src dir response: %v", err)
}
return workerSrcResp.WorkerSrcDir, nil
}