Add kube:// prefixed URI support (#1454)

*  Add `kube://` prefixed URI support

* 🐛 Fix the `a container name must be specified for pod <POD_NAME>, choose one of: [sniffer tracer]` error

* 🐛 Fix all of the issues in `kube://` prefixed URI support

* 🐛 Fix the `invalid reference format` error

* 🐛 Fix the `kubeUrl`
This commit is contained in:
M. Mert Yildiran
2023-11-17 15:56:07 -08:00
committed by GitHub
parent a028211f0a
commit c03de2222d
7 changed files with 369 additions and 162 deletions

View File

@@ -55,7 +55,7 @@ func init() {
tapCmd.Flags().String(configStructs.StorageLimitLabel, defaultTapConfig.StorageLimit, "Override the default storage limit (per node)")
tapCmd.Flags().String(configStructs.StorageClassLabel, defaultTapConfig.StorageClass, "Override the default storage class of the PersistentVolumeClaim (per node)")
tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, fmt.Sprintf("Capture from a PCAP snapshot of %s (.tar.gz) using your Docker Daemon instead of Kubernetes. TAR path from the file system or an S3 URI (object, folder or the bucket)", misc.Software))
tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, fmt.Sprintf("Capture from a PCAP snapshot of %s (.tar.gz) using your Docker Daemon instead of Kubernetes. TAR path from the file system, an S3 URI (s3://<BUCKET>/<KEY>) or a path in Kubeshark data volume (kube://<PATH>)", misc.Software))
tapCmd.Flags().Bool(configStructs.ServiceMeshLabel, defaultTapConfig.ServiceMesh, "Capture the encrypted traffic if the cluster is configured with a service mesh and with mTLS")
tapCmd.Flags().Bool(configStructs.TlsLabel, defaultTapConfig.Tls, "Capture the traffic that's encrypted with OpenSSL or Go crypto/tls libraries")
tapCmd.Flags().Bool(configStructs.IgnoreTaintedLabel, defaultTapConfig.IgnoreTainted, "Ignore tainted pods while running Worker DaemonSet")

View File

@@ -169,6 +169,7 @@ func createAndStartContainers(
"REACT_APP_DEFAULT_FILTER= ",
"REACT_APP_HUB_HOST= ",
fmt.Sprintf("REACT_APP_HUB_PORT=:%d", config.Config.Tap.Proxy.Hub.Port),
"REACT_APP_AUTH_ENABLED=false",
},
}, hostConfigFront, nil, nil, nameFront)
if err != nil {
@@ -374,6 +375,48 @@ func downloadTarFromS3(s3Url string) (tarPath string, err error) {
return
}
func downloadTarFromKubeVolume(kubeUrl string, volume string) (tarPath string, err error) {
var kubernetesProvider *kubernetes.Provider
kubernetesProvider, err = getKubernetesProviderForCli(false, false)
if err != nil {
return
}
srcPath := fmt.Sprintf("/app/%s/%s", volume, strings.TrimPrefix(kubeUrl, "kube://"))
var tempDirPath string
tempDirPath, err = os.MkdirTemp(os.TempDir(), "kubeshark_*")
if err != nil {
return
}
ctx := context.Background()
var pods []v1.Pod
pods, err = kubernetesProvider.ListPodsByAppLabel(
ctx,
config.Config.Tap.Release.Namespace,
map[string]string{"app.kubeshark.co/app": "worker"},
)
if err != nil {
return
}
for _, pod := range pods {
nodeDir := filepath.Join(tempDirPath, pod.Spec.NodeName)
if err = os.MkdirAll(nodeDir, 0755); err != nil {
return
}
err = kubernetes.CopyFromPod(ctx, kubernetesProvider, pod, srcPath, nodeDir)
if err != nil {
return
}
}
tarPath, err = tarDirectory(tempDirPath)
return
}
func tarDirectory(dirPath string) (string, error) {
tarPath := fmt.Sprintf("%s.tar.gz", dirPath)
@@ -446,6 +489,15 @@ func pcap(tarPath string) error {
}
}
if strings.HasPrefix(tarPath, "kube://") {
var err error
tarPath, err = downloadTarFromKubeVolume(tarPath, "data")
if err != nil {
log.Error().Err(err).Msg("Failed downloading from Kubeshark data volume")
return err
}
}
log.Info().Str("tar-path", tarPath).Msg("Openning")
ctx := context.Background()
@@ -456,9 +508,18 @@ func pcap(tarPath string) error {
}
defer cli.Close()
imageFront := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "front", config.Config.Tap.Docker.Tag)
imageHub := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "hub", config.Config.Tap.Docker.Tag)
imageWorker := fmt.Sprintf("%s%s:%s", config.Config.Tap.Docker.Registry, "worker", config.Config.Tap.Docker.Tag)
tag := config.Config.Tap.Docker.Tag
if tag == "" {
if misc.Ver == "0.0.0" {
tag = "latest"
} else {
tag = misc.Ver
}
}
imageFront := fmt.Sprintf("%s/%s:%s", config.Config.Tap.Docker.Registry, "front", tag)
imageHub := fmt.Sprintf("%s/%s:%s", config.Config.Tap.Docker.Registry, "hub", tag)
imageWorker := fmt.Sprintf("%s/%s:%s", config.Config.Tap.Docker.Registry, "worker", tag)
err = pullImages(ctx, cli, imageFront, imageHub, imageWorker)
if err != nil {
@@ -502,7 +563,7 @@ func pcap(tarPath string) error {
},
}
connector = connect.NewConnector(kubernetes.GetHubUrl(), connect.DefaultRetries, connect.DefaultTimeout)
connector = connect.NewConnector(kubernetes.GetProxyOnPort(config.Config.Tap.Proxy.Hub.Port), connect.DefaultRetries, connect.DefaultTimeout)
connector.PostWorkerPodToHub(workerPod)
// License
@@ -511,7 +572,7 @@ func pcap(tarPath string) error {
}
log.Info().
Str("url", kubernetes.GetHubUrl()).
Str("url", kubernetes.GetProxyOnPort(config.Config.Tap.Proxy.Hub.Port)).
Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
url := kubernetes.GetProxyOnPort(config.Config.Tap.Proxy.Front.Port)