mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-25 03:49:12 +00:00
240 lines
7.0 KiB
Go
240 lines
7.0 KiB
Go
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/kubeshark/gopacket"
|
|
"github.com/kubeshark/gopacket/layers"
|
|
"github.com/kubeshark/gopacket/pcapgo"
|
|
"github.com/rs/zerolog/log"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
clientk8s "k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/remotecommand"
|
|
)
|
|
|
|
const label = "app.kubeshark.co/app=worker"
|
|
|
|
// listWorkerPods fetches all the worker pods using the Kubernetes client
|
|
func listWorkerPods(ctx context.Context, clientset *clientk8s.Clientset, namespace string) (*corev1.PodList, error) {
|
|
labelSelector := label
|
|
|
|
// List all pods matching the label
|
|
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
|
|
LabelSelector: labelSelector,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list worker pods: %w", err)
|
|
}
|
|
|
|
return pods, nil
|
|
}
|
|
|
|
// listFilesInPodDir lists all files in the specified directory inside the pod
|
|
func listFilesInPodDir(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName, namespace, srcDir string) ([]string, error) {
|
|
cmd := []string{"ls", srcDir}
|
|
req := clientset.CoreV1().RESTClient().Post().
|
|
Resource("pods").
|
|
Name(podName).
|
|
Namespace(namespace).
|
|
SubResource("exec").
|
|
Param("container", "sniffer").
|
|
Param("stdout", "true").
|
|
Param("stderr", "true").
|
|
Param("command", cmd[0]).
|
|
Param("command", cmd[1])
|
|
|
|
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize executor: %w", err)
|
|
}
|
|
|
|
// Buffer to capture stdout (file listing)
|
|
var stdoutBuf bytes.Buffer
|
|
var stderrBuf bytes.Buffer
|
|
|
|
// Stream the result of the ls command
|
|
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
|
|
Stdout: &stdoutBuf,
|
|
Stderr: &stderrBuf, // Capture stderr for better debugging
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error listing files in pod: %w. Stderr: %s", err, stderrBuf.String())
|
|
}
|
|
|
|
// Split the output (file names) into a list
|
|
files := strings.Split(strings.TrimSpace(stdoutBuf.String()), "\n")
|
|
return files, nil
|
|
}
|
|
|
|
// copyFileFromPod copies a single file from a pod to a local destination
|
|
func copyFileFromPod(ctx context.Context, clientset *clientk8s.Clientset, config *rest.Config, podName, namespace, srcFile, destFile string) error {
|
|
cmd := []string{"cat", srcFile}
|
|
req := clientset.CoreV1().RESTClient().Post().
|
|
Resource("pods").
|
|
Name(podName).
|
|
Namespace(namespace).
|
|
SubResource("exec").
|
|
Param("container", "sniffer").
|
|
Param("stdout", "true").
|
|
Param("stderr", "true").
|
|
Param("command", cmd[0]).
|
|
Param("command", cmd[1])
|
|
|
|
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize executor: %w", err)
|
|
}
|
|
|
|
// Create a local file to write the content to
|
|
outFile, err := os.Create(destFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create destination file: %w", err)
|
|
}
|
|
defer outFile.Close()
|
|
|
|
// Capture stderr for error logging
|
|
var stderrBuf bytes.Buffer
|
|
|
|
// Stream the file content from the pod to the local file
|
|
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
|
|
Stdout: outFile,
|
|
Stderr: &stderrBuf, // Capture stderr for better debugging
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error copying file from pod: %w. Stderr: %s", err, stderrBuf.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func mergePCAPs(outputFile string, inputFiles []string) error {
|
|
// Create the output file
|
|
f, err := os.Create(outputFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
// Create a pcap writer for the output file
|
|
writer := pcapgo.NewWriter(f)
|
|
err = writer.WriteFileHeader(65536, layers.LinkTypeEthernet) // Snapshot length and LinkType
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, inputFile := range inputFiles {
|
|
// Open each input file
|
|
file, err := os.Open(inputFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
reader, err := pcapgo.NewReader(file)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Failed to create pcapng reader for %v", file.Name())
|
|
return err
|
|
}
|
|
|
|
// Create the packet source
|
|
packetSource := gopacket.NewPacketSource(reader, layers.LinkTypeEthernet)
|
|
|
|
for packet := range packetSource.Packets() {
|
|
err := writer.WritePacket(packet.Metadata().CaptureInfo, packet.Data())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setPcapConfigInKubernetes(clientset *clientk8s.Clientset, podName, namespace, enabledPcap, timeInterval, maxTime, maxSize string) error {
|
|
// Load the existing ConfigMap
|
|
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), "kubeshark-config-map", metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load ConfigMap: %w", err)
|
|
}
|
|
|
|
// Update the values with user-provided input
|
|
if len(configMap.Data["PCAP_TIME_INTERVAL"]) > 0 {
|
|
configMap.Data["PCAP_TIME_INTERVAL"] = timeInterval
|
|
|
|
}
|
|
if len(configMap.Data["PCAP_MAX_SIZE"]) > 0 {
|
|
configMap.Data["PCAP_MAX_SIZE"] = maxSize
|
|
|
|
}
|
|
if len(configMap.Data["PCAP_MAX_TIME"]) > 0 {
|
|
configMap.Data["PCAP_MAX_TIME"] = maxTime
|
|
|
|
}
|
|
if len(configMap.Data["PCAP_DUMP_ENABLE"]) > 0 {
|
|
configMap.Data["PCAP_DUMP_ENABLE"] = enabledPcap
|
|
}
|
|
|
|
// Apply the updated ConfigMap back to the cluster
|
|
_, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update ConfigMap: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// WorkerSrcResponse represents the response structure from the worker's /pcaps/worker-src endpoint.
|
|
type WorkerSrcResponse struct {
|
|
WorkerSrcDir string `json:"workerSrcDir"`
|
|
}
|
|
|
|
// getWorkerSource fetches the worker source directory from the worker pod via the /pcaps/worker-src endpoint.
|
|
func getWorkerSource(clientset *kubernetes.Clientset, podName, namespace string) (string, error) {
|
|
// Get the worker pod IP or service address (you can also use the cluster DNS name)
|
|
pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{})
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Failed to get pod %s", podName)
|
|
return "", err
|
|
}
|
|
|
|
// Construct the URL to access the worker's /pcaps/worker-src endpoint
|
|
workerURL := fmt.Sprintf("http://%s:30001/pcaps/worker-src", pod.Status.PodIP)
|
|
|
|
// Make an HTTP request to the worker pod's endpoint
|
|
resp, err := http.Get(workerURL)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Failed to request worker src dir from %s", workerURL)
|
|
return "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return "", fmt.Errorf("failed to get worker src dir, status code: %d", resp.StatusCode)
|
|
}
|
|
|
|
// Read the response body
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Parse the JSON response
|
|
var workerSrcResp WorkerSrcResponse
|
|
err = json.Unmarshal(body, &workerSrcResp)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to parse worker src dir response: %v", err)
|
|
}
|
|
|
|
return workerSrcResp.WorkerSrcDir, nil
|
|
}
|