Compare commits

...

18 Commits
38.2 ... 38.4

Author SHA1 Message Date
M. Mert Yildiran
c809117b2c 🐛 Don't open the Hub URL in the browser (proxy command) 2023-01-26 19:35:21 +03:00
M. Mert Yildiran
2c6c71cf49 Log the worker DaemonSet creation 2023-01-22 05:14:28 +03:00
M. Mert Yildiran
1533d1ec28 Suggest kubeshark proxy at the end of tap 2023-01-22 05:10:29 +03:00
M. Mert Yildiran
edf5c8cd6d Don't call cancel() in case of a proxy/port-forward error 2023-01-22 05:04:44 +03:00
M. Mert Yildiran
6c69fb6bc4 POST pod regex to Hub if Kubeshark is still running 2023-01-22 04:33:30 +03:00
M. Mert Yildiran
4dd941f8d9 Set REACT_APP_DEFAULT_FILTER to empty string 2023-01-22 04:27:05 +03:00
M. Mert Yildiran
90294e32c1 👕 Remove unused getK8sTapManagerErrorText method 2023-01-22 04:26:21 +03:00
M. Mert Yildiran
5f4a856c5e 🐛 Fix the proxy command 2023-01-22 04:25:47 +03:00
M. Mert Yildiran
4e3233ade8 Deploy workers DaemonSet without NodeSelector and call POST /pods/regex to set the pod regex in Hub 2023-01-22 04:14:44 +03:00
M. Mert Yildiran
846f253a03 Don't watch and POST Worker pods to Hub 2023-01-21 03:09:15 +03:00
M. Mert Yildiran
f128ae3993 🔥 Remove config map and image pull checks 2023-01-21 01:32:21 +03:00
Alon Girmonsky
38da25ecc8 Infra information
Lately we've been experiencing bugs resulting from an environment with multiple nodes and the absence of a CNI.
2023-01-18 10:04:47 -08:00
M. Mert Yildiran
bf777f9fca Change the color coding of new, targeted and untargeted pods log messages 2023-01-17 02:09:26 +03:00
M. Mert Yildiran
3c4272c6d1 🐛 Fix the issues in port-forward 2023-01-16 00:06:52 +03:00
Alon Girmonsky
920535b643 protocol-aware 2023-01-15 10:59:16 -08:00
Alon Girmonsky
b55dd5b072 Shorter message bar 2023-01-15 10:47:13 -08:00
Alon Girmonsky
f5cf15d657 Changing the hero message 2023-01-15 10:46:06 -08:00
Alon Girmonsky
fe623438c4 Update the upper message
include DNS and identity-aware service map.
2023-01-14 21:25:00 -08:00
19 changed files with 224 additions and 723 deletions

View File

@@ -10,6 +10,9 @@ assignees: ''
**Describe the bug**
A clear and concise description of what the bug is.
**Provide more information**
Running on EKS, AKS, GKE, Minikube, Rancher, OpenShift? Number of Nodes? CNI?
**To Reproduce**
Steps to reproduce the behavior:
1. Run `kubeshark <command> ...`

View File

@@ -25,11 +25,11 @@
<p align="center">
<b>
<a href="https://github.com/kubeshark/kubeshark/releases/tag/38.1">Version 38.1</a> is out with <a href="https://docs.kubeshark.co/en/pcap_export_import">PCAP export/import</a>, <a href="https://docs.kubeshark.co/en/tcp_streams">TCP streams</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
<a href="https://github.com/kubeshark/kubeshark/releases/latest">V38.2</a> is out with <a href="https://docs.kubeshark.co/en/pcap_export_import">PCAP</a> export, <a href="https://docs.kubeshark.co/en/dns">DNS</a>, <a href="https://docs.kubeshark.co/en/service_map">Identity-aware Service Map</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
</b>
</p>
**Kubeshark** is an API Traffic Viewer for [**Kubernetes**](https://kubernetes.io/) providing deep visibility and monitoring of all API traffic and payloads going in, out and across containers and Pods inside a Kubernetes cluster.
**Kubeshark** is an API Traffic Viewer for [**Kubernetes**](https://kubernetes.io/) providing real-time, protocol-aware visibility into Kubernetes internal network, capturing, dissecting and monitoring all traffic and payloads going in, out and across containers, pods, nodes and clusters.
![Simple UI](https://github.com/kubeshark/assets/raw/master/png/kubeshark-ui.png)

View File

@@ -1,123 +0,0 @@
package check
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
log.Info().Str("procedure", "image-pull-in-cluster").Msg("Checking:")
namespace := "default"
podName := fmt.Sprintf("%s-test", misc.Program)
defer func() {
if err := kubernetesProvider.RemovePod(ctx, namespace, podName); err != nil {
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("While removing test pod!")
}
}()
if err := createImagePullInClusterPod(ctx, kubernetesProvider, namespace, podName); err != nil {
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("While creating test pod!")
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, namespace, podName); err != nil {
log.Printf("%v cluster is not able to pull %s containers from docker hub, err: %v", misc.Program, fmt.Sprintf(utils.Red, "✗"), err)
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("Unable to pull images from Docker Hub!")
return false
}
log.Info().
Str("namespace", namespace).
Str("pod", podName).
Msg("Pulling images from Docker Hub is passed.")
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{namespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func createImagePullInClusterPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
image := docker.GetWorkerImage()
log.Info().Str("image", image).Msg("Testing image pull:")
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: image,
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, namespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -15,9 +15,6 @@ func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.SelfNamespace)
allResourcesExist := checkResourceExist(config.Config.SelfNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.SelfNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.SelfNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist

View File

@@ -33,9 +33,6 @@ func runCheck() {
checkPassed = check.KubernetesPermissions(ctx, embedFS, kubernetesProvider)
}
if checkPassed {
checkPassed = check.ImagePullInCluster(ctx, kubernetesProvider)
}
if checkPassed {
checkPassed = check.KubernetesResources(ctx, kubernetesProvider)
}

View File

@@ -10,5 +10,5 @@ func performCleanCommand() {
return
}
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, false)
}

View File

@@ -18,32 +18,32 @@ import (
"github.com/rs/zerolog/log"
)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, serviceName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName, cancel)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, serviceName string, podName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName)
if err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", proxyPortLabel))
cancel()
Msg(fmt.Sprintf("Error occured while running K8s proxy. Try setting different port using --%s", proxyPortLabel))
return
}
connector := connect.NewConnector(kubernetes.GetLocalhostOnPort(srcPort), connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(healthCheck); err != nil {
log.Error().Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward..")
log.Warn().
Str("service", serviceName).
Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward...")
if err := httpServer.Shutdown(ctx); err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg("Error occurred while stopping proxy.")
}
podRegex, _ := regexp.Compile(kubernetes.HubPodName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx, cancel); err != nil {
podRegex, _ := regexp.Compile(podName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx); err != nil {
log.Error().
Str("pod-regex", podRegex.String()).
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", proxyPortLabel))
cancel()
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port using --%s", proxyPortLabel))
return
}
@@ -53,7 +53,6 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
Str("service", serviceName).
Err(errormessage.FormatError(err)).
Msg("Couldn't connect to service.")
cancel()
return
}
}
@@ -97,11 +96,13 @@ func handleKubernetesProviderError(err error) {
}
}
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string) {
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, withoutCleanup bool) {
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
if !withoutCleanup {
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
}
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {

View File

@@ -26,7 +26,7 @@ func runProxy() {
exists, err := kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.FrontServiceName)
if err != nil {
log.Error().
Str("service", misc.Program).
Str("service", kubernetes.FrontServiceName).
Err(err).
Msg("Failed to found service!")
cancel()
@@ -42,36 +42,96 @@ func runProxy() {
return
}
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
exists, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.HubServiceName)
if err != nil {
log.Error().
Str("service", kubernetes.HubServiceName).
Err(err).
Msg("Failed to found service!")
cancel()
return
}
response, err := http.Get(fmt.Sprintf("%s/", url))
if !exists {
log.Error().
Str("service", kubernetes.HubServiceName).
Str("command", fmt.Sprintf("%s %s", misc.Program, tapCmd.Use)).
Msg("Service not found! You should run the command first:")
cancel()
return
}
var establishedProxy bool
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
response, err := http.Get(fmt.Sprintf("%s/", hubUrl))
if err == nil && response.StatusCode == 200 {
log.Info().
Str("service", kubernetes.HubServiceName).
Int("port", int(config.Config.Tap.Proxy.Hub.SrcPort)).
Msg("Found a running service.")
okToOpen("Hub", hubUrl, true)
} else {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.HubServiceName,
kubernetes.HubPodName,
configStructs.ProxyHubPortLabel,
config.Config.Tap.Proxy.Hub.SrcPort,
config.Config.Tap.Proxy.Hub.DstPort,
"/echo",
)
connector := connect.NewConnector(hubUrl, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection("/echo"); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Hub."))
return
}
establishedProxy = true
okToOpen("Hub", hubUrl, true)
}
frontUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
response, err = http.Get(fmt.Sprintf("%s/", frontUrl))
if err == nil && response.StatusCode == 200 {
log.Info().
Str("service", kubernetes.FrontServiceName).
Int("port", int(config.Config.Tap.Proxy.Front.SrcPort)).
Msg("Found a running service.")
okToOpen(url)
return
}
log.Info().Msg("Establishing connection to K8s cluster...")
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
okToOpen("Kubeshark", frontUrl, false)
} else {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.FrontServiceName,
kubernetes.FrontPodName,
configStructs.ProxyFrontPortLabel,
config.Config.Tap.Proxy.Front.SrcPort,
config.Config.Tap.Proxy.Front.DstPort,
"",
)
connector := connect.NewConnector(frontUrl, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(""); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
return
}
connector := connect.NewConnector(url, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(""); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
return
establishedProxy = true
okToOpen("Kubeshark", frontUrl, false)
}
okToOpen(url)
utils.WaitForTermination(ctx, cancel)
if establishedProxy {
utils.WaitForTermination(ctx, cancel)
}
}
func okToOpen(url string) {
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
func okToOpen(name string, url string, noBrowser bool) {
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", name)))
if !config.Config.HeadlessMode {
if !config.Config.HeadlessMode && !noBrowser {
utils.OpenBrowser(url)
}
}

View File

@@ -85,7 +85,9 @@ func tap() {
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.ImagePullSecrets(), config.Config.Tap.Debug); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Warn().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance", misc.Software, misc.Program, misc.Software))
log.Info().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance.", misc.Software, misc.Program, misc.Software))
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
log.Info().Msg("Updated the targeted pods. Exiting.")
} else {
defer resources.CleanUpSelfResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
log.Error().Err(errormessage.FormatError(err)).Msg("Error creating resources!")
@@ -102,10 +104,13 @@ func tap() {
// block until exit signal or error
utils.WaitForTermination(ctx, cancel)
log.Warn().
Str("command", fmt.Sprintf("%s proxy", misc.Program)).
Msg(fmt.Sprintf(utils.Yellow, "To re-establish a proxy/port-forward, run:"))
}
func finishTapExecution(kubernetesProvider *kubernetes.Provider) {
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, true)
}
/*
@@ -121,62 +126,12 @@ func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernete
printNoPodsFoundSuggestion(namespaces)
}
for _, targetedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
}
return nil
}
}
func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{
TargetNamespaces: targetNamespaces,
PodFilterRegex: *config.Config.Tap.PodRegex(),
SelfNamespace: config.Config.SelfNamespace,
WorkerResources: config.Config.Tap.Resources.Worker,
ImagePullPolicy: config.Config.ImagePullPolicy(),
ImagePullSecrets: config.Config.ImagePullSecrets(),
SelfServiceAccountExists: state.selfServiceAccountExists,
ServiceMesh: config.Config.Tap.ServiceMesh,
Tls: config.Config.Tap.Tls,
Debug: config.Config.Tap.Debug,
}, startTime)
if err != nil {
return err
}
go func() {
for {
select {
case syncerErr, ok := <-workerSyncer.ErrorOut:
if !ok {
log.Debug().Msg("workerSyncer err channel closed, ending listener loop")
return
}
log.Error().Msg(getK8sTapManagerErrorText(syncerErr))
cancel()
case _, ok := <-workerSyncer.TapPodChangesOut:
if !ok {
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return
}
go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
return
}
go connector.PostWorkerPodToHub(pod)
case <-ctx.Done():
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
return
}
}
}()
return nil
}
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
@@ -185,19 +140,6 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, %s will automatically target matching pods if any are created later%s", misc.Software, suggestionStr))
}
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
return fmt.Sprintf("Failed to update currently targeted pods: %v", err.OriginalError)
case kubernetes.TapManagerPodWatchError:
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
case kubernetes.TapManagerWorkerUpdateError:
return fmt.Sprintf("Error updating worker: %v", err.OriginalError)
default:
return fmt.Sprintf("Unknown error occured in K8s tap manager: %v", err.OriginalError)
}
}
func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.HubPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
@@ -215,9 +157,9 @@ func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, c
switch wEvent.Type {
case kubernetes.EventAdded:
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added pod.")
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added:")
case kubernetes.EventDeleted:
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed pod.")
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed:")
cancel()
return
case kubernetes.EventModified:
@@ -295,9 +237,9 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
switch wEvent.Type {
case kubernetes.EventAdded:
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added pod.")
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added:")
case kubernetes.EventDeleted:
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed pod.")
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed:")
cancel()
return
case kubernetes.EventModified:
@@ -427,19 +369,50 @@ func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Hub.SrcPort, config.Config.Tap.Proxy.Hub.DstPort, "/echo")
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.HubServiceName,
kubernetes.HubPodName,
configStructs.ProxyHubPortLabel,
config.Config.Tap.Proxy.Hub.SrcPort,
config.Config.Tap.Proxy.Hub.DstPort,
"/echo",
)
if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting worker syncer")
cancel()
err := kubernetes.CreateWorkers(
kubernetesProvider,
state.selfServiceAccountExists,
ctx,
config.Config.SelfNamespace,
config.Config.Tap.Resources.Worker,
config.Config.ImagePullPolicy(),
config.Config.ImagePullSecrets(),
config.Config.Tap.ServiceMesh,
config.Config.Tap.Tls,
config.Config.Tap.Debug,
)
if err != nil {
log.Error().Err(err).Send()
}
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
}
func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyHubPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.FrontServiceName,
kubernetes.FrontPodName,
configStructs.ProxyFrontPortLabel,
config.Config.Tap.Proxy.Front.SrcPort,
config.Config.Tap.Proxy.Front.DstPort,
"",
)
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))

View File

@@ -34,7 +34,7 @@ type WorkerConfig struct {
type HubConfig struct {
SrcPort uint16 `yaml:"port" default:"8898"`
DstPort uint16 `yaml:"srvport" default:"8898"`
DstPort uint16 `yaml:"srvport" default:"80"`
}
type FrontConfig struct {

View File

@@ -12,7 +12,6 @@ import (
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
@@ -64,7 +63,6 @@ func (connector *Connector) isReachable(path string) (bool, error) {
}
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
// TODO: This request is responsible for proxy_server.go:147] Error while proxying request: context canceled log
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
if podMarshalled, err := json.Marshal(pod); err != nil {
@@ -116,22 +114,32 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
}
}
func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) {
postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url)
type postRegexRequest struct {
Regex string `json:"regex"`
Namespaces []string `json:"namespaces"`
}
if podsMarshalled, err := json.Marshal(pods); err != nil {
log.Error().Err(err).Msg("Failed to marshal the targeted pods:")
func (connector *Connector) PostRegexToHub(regex string, namespaces []string) {
postRegexUrl := fmt.Sprintf("%s/pods/regex", connector.url)
payload := postRegexRequest{
Regex: regex,
Namespaces: namespaces,
}
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the payload:")
} else {
ok := false
for !ok {
if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, err = utils.Post(postRegexUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:")
log.Debug().Err(err).Msg("Failed sending the payload to Hub:")
} else {
ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:")
log.Debug().Str("regex", regex).Strs("namespaces", namespaces).Msg("Reported payload to Hub:")
}
time.Sleep(time.Second)
}

View File

@@ -14,7 +14,6 @@ const (
ServiceAccountName = SelfResourcesPrefix + "service-account"
WorkerDaemonSetName = SelfResourcesPrefix + "worker-daemon-set"
WorkerPodName = SelfResourcesPrefix + "worker"
ConfigMapName = SelfResourcesPrefix + "config"
MinKubernetesServerVersion = "1.16.0"
)

View File

@@ -182,9 +182,6 @@ type PodOptions struct {
}
func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
@@ -264,9 +261,6 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
}
func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPort string) (*core.Pod, error) {
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
@@ -317,7 +311,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
Env: []core.EnvVar{
{
Name: "REACT_APP_DEFAULT_FILTER",
Value: "timestamp >= now()",
Value: " ",
},
{
Name: "REACT_APP_HUB_HOST",
@@ -419,11 +413,6 @@ func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (
return provider.doesResourceExist(namespaceResource, err)
}
func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) {
configMapResource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(configMapResource, err)
}
func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, name string) (bool, error) {
serviceAccountResource, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(serviceAccountResource, err)
@@ -663,7 +652,6 @@ func (provider *Provider) ApplyWorkerDaemonSet(
daemonSetName string,
podImage string,
workerPodName string,
nodeNames []string,
serviceAccountName string,
resources Resources,
imagePullPolicy core.PullPolicy,
@@ -673,17 +661,12 @@ func (provider *Provider) ApplyWorkerDaemonSet(
debug bool,
) error {
log.Debug().
Int("node-count", len(nodeNames)).
Str("namespace", namespace).
Str("daemonset-name", daemonSetName).
Str("image", podImage).
Str("pod", workerPodName).
Msg("Applying worker DaemonSets.")
if len(nodeNames) == 0 {
return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName)
}
command := []string{"./worker", "-i", "any", "-port", "8897"}
if debug {
@@ -763,22 +746,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits)
workerContainer.WithResources(workerResources)
matchFields := make([]*applyconfcore.NodeSelectorTermApplyConfiguration, 0)
for _, nodeName := range nodeNames {
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("metadata.name")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
nodeSelectorRequirement.WithValues(nodeName)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement)
matchFields = append(matchFields, nodeSelectorTerm)
}
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(matchFields...)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)

View File

@@ -21,7 +21,7 @@ import (
const k8sProxyApiPrefix = "/"
const selfServicePort = 80
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string, cancel context.CancelFunc) (*http.Server, error) {
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string) (*http.Server, error) {
log.Info().
Str("namespace", selfNamespace).
Str("service", selfServiceName).
@@ -55,7 +55,7 @@ func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16,
go func() {
if err := server.Serve(l); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("While creating proxy!")
cancel()
return
}
}()
@@ -99,7 +99,7 @@ func getRerouteHttpHandlerSelfStatic(proxyHandler http.Handler, selfNamespace st
})
}
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context, cancel context.CancelFunc) (*portforward.PortForwarder, error) {
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context) (*portforward.PortForwarder, error) {
pods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, podRegex, []string{namespace})
if err != nil {
return nil, err
@@ -132,7 +132,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
go func() {
if err = forwarder.ForwardPorts(); err != nil {
log.Error().Err(err).Msg("While Kubernetes port-forwarding!")
cancel()
return
}
}()

View File

@@ -1,8 +1,6 @@
package kubernetes
import (
"regexp"
"github.com/kubeshark/base/pkg/models"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -48,44 +46,6 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus {
return result
}
func excludeSelfPods(pods []core.Pod) []core.Pod {
selfPrefixRegex := regexp.MustCompile("^" + SelfResourcesPrefix)
nonSelfPods := make([]core.Pod, 0)
for _, pod := range pods {
if !selfPrefixRegex.MatchString(pod.Name) {
nonSelfPods = append(nonSelfPods, pod)
}
}
return nonSelfPods
}
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
added = getMissingPods(newPods, oldPods)
removed = getMissingPods(oldPods, newPods)
return added, removed
}
//returns pods present in pods1 array and missing in pods2 array
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
missingPods := make([]core.Pod, 0)
for _, pod1 := range pods1 {
var found = false
for _, pod2 := range pods2 {
if pod1.UID == pod2.UID {
found = true
break
}
}
if !found {
missingPods = append(missingPods, pod1)
}
}
return missingPods
}
func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo {
podInfos := make([]*models.PodInfo, 0)
for _, pod := range pods {

View File

@@ -1,391 +0,0 @@
package kubernetes
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/debounce"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
)
const updateWorkersDelay = 5 * time.Second
type TargetedPodChangeEvent struct {
Added []v1.Pod
Removed []v1.Pod
}
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct {
startTime time.Time
context context.Context
CurrentlyTargetedPods []v1.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TargetedPodChangeEvent
WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sTapManagerError
nodeToTargetedPodMap models.NodeToPodsMap
targetedNodes []string
}
type WorkerSyncerConfig struct {
TargetNamespaces []string
PodFilterRegex regexp.Regexp
SelfNamespace string
WorkerResources Resources
ImagePullPolicy v1.PullPolicy
ImagePullSecrets []v1.LocalObjectReference
SelfServiceAccountExists bool
ServiceMesh bool
Tls bool
Debug bool
}
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTargetedPods: make([]v1.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil {
return nil, err
}
if err := syncer.updateWorkers(); err != nil {
return nil, err
}
go syncer.watchPodsForTargeting()
go syncer.watchWorkerEvents()
go syncer.watchWorkerPods()
return syncer, nil
}
func (workerSyncer *WorkerSyncer) watchWorkerPods() {
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.SelfNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
log.Error().Str("pod", WorkerPodName).Err(err).Msg(fmt.Sprintf("While parsing %s resource!", misc.Software))
continue
}
log.Debug().
Str("pod", pod.Name).
Str("node", pod.Spec.NodeName).
Interface("phase", pod.Status.Phase).
Msg("Watching pod events...")
if pod.Spec.NodeName != "" {
workerSyncer.WorkerPodsChanges <- pod
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!")
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", WorkerPodName).
Msg("Watching pod, context done.")
return
}
}
}
func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex, "pod")
eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.SelfNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
log.Error().
Str("pod", WorkerPodName).
Err(err).
Msg("Parsing resource event.")
continue
}
log.Debug().
Str("pod", WorkerPodName).
Str("event", event.Name).
Time("time", event.CreationTimestamp.Time).
Str("name", event.Regarding.Name).
Str("kind", event.Regarding.Kind).
Str("reason", event.Reason).
Str("note", event.Note).
Msg("Watching events.")
pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.SelfNamespace, event.Regarding.Name)
if err1 != nil {
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
continue
}
workerSyncer.WorkerPodsChanges <- pod
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
log.Error().
Str("pod", WorkerPodName).
Err(err).
Msg("While watching events.")
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", WorkerPodName).
Msg("Watching pod events, context done.")
return
}
}
}
func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() {
err, changeFound := workerSyncer.updateCurrentlyTargetedPods()
if err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodListError,
}
}
if !changeFound {
log.Debug().Msg("Nothing changed. Updating workers is not needed.")
return
}
if err := workerSyncer.updateWorkers(); err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerWorkerUpdateError,
}
}
}
restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
}
switch wEvent.Type {
case EventAdded:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Added matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventDeleted:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Removed matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventModified:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Str("ip", pod.Status.PodIP).
Interface("phase", pod.Status.Phase).
Msg("Modified matching pod.")
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
case <-workerSyncer.context.Done():
log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
}
}
}
func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) {
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget)
for _, addedPod := range addedPods {
log.Info().Str("pod", addedPod.Name).Msg("Currently targeting:")
}
for _, removedPod := range removedPods {
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targeting is stopped.")
}
if len(addedPods) > 0 || len(removedPods) > 0 {
workerSyncer.CurrentlyTargetedPods = podsToTarget
workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods)
workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
return nil, true
}
return nil, false
}
}
func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap))
i := 0
for node := range workerSyncer.nodeToTargetedPodMap {
nodesToTarget[i] = node
i++
}
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.")
image := docker.GetWorkerImage()
if len(workerSyncer.nodeToTargetedPodMap) > 0 {
var serviceAccountName string
if workerSyncer.config.SelfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap))
for nodeName := range workerSyncer.nodeToTargetedPodMap {
nodeNames = append(nodeNames, nodeName)
}
if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName,
nodeNames,
serviceAccountName,
workerSyncer.config.WorkerResources,
workerSyncer.config.ImagePullPolicy,
workerSyncer.config.ImagePullSecrets,
workerSyncer.config.ServiceMesh,
workerSyncer.config.Tls,
workerSyncer.config.Debug); err != nil {
return err
}
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.")
} else {
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName); err != nil {
return err
}
log.Debug().Msg("Successfully resetted Worker DaemonSet")
}
workerSyncer.targetedNodes = nodesToTarget
return nil
}

54
kubernetes/workers.go Normal file
View File

@@ -0,0 +1,54 @@
package kubernetes
import (
"context"
"github.com/kubeshark/kubeshark/docker"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
)
func CreateWorkers(
kubernetesProvider *Provider,
selfServiceAccountExists bool,
ctx context.Context,
namespace string,
resources Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
) error {
image := docker.GetWorkerImage()
var serviceAccountName string
if selfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
log.Info().Msg("Creating the worker DaemonSet...")
if err := kubernetesProvider.ApplyWorkerDaemonSet(
ctx,
namespace,
WorkerDaemonSetName,
image,
WorkerPodName,
serviceAccountName,
resources,
imagePullPolicy,
imagePullSecrets,
serviceMesh,
tls,
debug,
); err != nil {
return err
}
log.Info().Msg("Successfully created the worker DaemonSet.")
return nil
}

View File

@@ -113,11 +113,6 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveConfigMap(ctx, selfResourcesNamespace, kubernetes.ConfigMapName); err != nil {
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", kubernetes.ConfigMapName, selfResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if resources, err := kubernetesProvider.ListManagedServiceAccounts(ctx, selfResourcesNamespace); err != nil {
resourceDesc := fmt.Sprintf("ServiceAccounts in namespace %s", selfResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)

View File

@@ -5,8 +5,8 @@ const (
Red = "\033[1;31m%s\033[0m"
Green = "\033[1;32m%s\033[0m"
Yellow = "\033[1;33m%s\033[0m"
Purple = "\033[1;34m%s\033[0m"
Blue = "\033[1;34m%s\033[0m"
Magenta = "\033[1;35m%s\033[0m"
Teal = "\033[1;36m%s\033[0m"
Cyan = "\033[1;36m%s\033[0m"
White = "\033[1;37m%s\033[0m"
)