🎨 Replace the tap/tapper terminology with deploy, worker and targetted

This commit is contained in:
M. Mert Yildiran 2022-11-29 07:31:36 +03:00
parent ae278526ab
commit 6ca0fe137e
No known key found for this signature in database
GPG Key ID: DA5D6DCBB758A461
18 changed files with 250 additions and 250 deletions

View File

@ -12,7 +12,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
)
func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {
func KubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {
log.Info().Str("procedure", "kubernetes-permissions").Msg("Checking:")
var filePath string

View File

@ -66,33 +66,33 @@ func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.
Str("name", kubernetes.HubPodName).
Msg("Pod is running.")
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.ResourcesNamespace, kubernetes.TapperPodName); err != nil {
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.ResourcesNamespace, kubernetes.WorkerPodName); err != nil {
log.Error().
Str("name", kubernetes.TapperPodName).
Str("name", kubernetes.WorkerPodName).
Err(err).
Msg("While checking if pods are running!")
return false
} else {
tappers := 0
notRunningTappers := 0
workers := 0
notRunningWorkers := 0
for _, pod := range pods {
tappers += 1
workers += 1
if !kubernetes.IsPodRunning(&pod) {
notRunningTappers += 1
notRunningWorkers += 1
}
}
if notRunningTappers > 0 {
if notRunningWorkers > 0 {
log.Error().
Str("name", kubernetes.TapperPodName).
Msg(fmt.Sprintf("%d/%d pods are not running!", notRunningTappers, tappers))
Str("name", kubernetes.WorkerPodName).
Msg(fmt.Sprintf("%d/%d pods are not running!", notRunningWorkers, workers))
return false
}
log.Info().
Str("name", kubernetes.TapperPodName).
Msg(fmt.Sprintf("All %d pods are running.", tappers))
Str("name", kubernetes.WorkerPodName).
Msg(fmt.Sprintf("All %d pods are running.", workers))
return true
}
}

View File

@ -29,7 +29,7 @@ func runKubesharkCheck() {
}
if checkPassed {
checkPassed = check.TapKubernetesPermissions(ctx, embedFS, kubernetesProvider)
checkPassed = check.KubernetesPermissions(ctx, embedFS, kubernetesProvider)
}
if checkPassed {
@ -47,8 +47,8 @@ func runKubesharkCheck() {
log.Info().Msg(fmt.Sprintf(utils.Green, "All checks are passed."))
} else {
log.Error().
Str("command1", "kubeshark clean").
Str("command2", "kubeshark tap").
Str("command1", fmt.Sprintf("kubeshark %s", cleanCmd.Use)).
Str("command2", fmt.Sprintf("kubeshark %s", deployCmd.Use)).
Msg(fmt.Sprintf(utils.Red, "There are issues in your deployment! Run these commands:"))
os.Exit(1)
}

View File

@ -26,7 +26,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
if err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", configStructs.GuiPortTapName))
Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", configStructs.ProxyPortLabel))
cancel()
return
}
@ -45,7 +45,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
log.Error().
Str("pod-regex", podRegex.String()).
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", configStructs.GuiPortTapName))
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", configStructs.ProxyPortLabel))
cancel()
return
}

View File

@ -16,7 +16,7 @@ var deployCmd = &cobra.Command{
Short: "Deploy Kubeshark into your K8s cluster.",
Long: `Deploy Kubeshark into your K8s cluster to gain visibility.`,
RunE: func(cmd *cobra.Command, args []string) error {
RunKubesharkTap()
deploy()
return nil
},
PreRunE: func(cmd *cobra.Command, args []string) error {
@ -46,15 +46,15 @@ func init() {
log.Debug().Err(err).Send()
}
deployCmd.Flags().Uint16P(configStructs.GuiPortTapName, "p", defaultDeployConfig.GuiPort, "Provide a custom port for the web interface webserver")
deployCmd.Flags().StringSliceP(configStructs.NamespacesTapName, "n", defaultDeployConfig.Namespaces, "Namespaces selector")
deployCmd.Flags().BoolP(configStructs.AllNamespacesTapName, "A", defaultDeployConfig.AllNamespaces, "Tap all namespaces")
deployCmd.Flags().Bool(configStructs.EnableRedactionTapName, defaultDeployConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values")
deployCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultDeployConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
deployCmd.Flags().Uint16P(configStructs.ProxyPortLabel, "p", defaultDeployConfig.ProxyPort, "Provide a custom port for the web interface webserver.")
deployCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultDeployConfig.Namespaces, "Namespaces selector.")
deployCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultDeployConfig.AllNamespaces, "Deploy to all namespaces.")
deployCmd.Flags().Bool(configStructs.EnableRedactionLabel, defaultDeployConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values.")
deployCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeLabel, defaultDeployConfig.HumanMaxEntriesDBSize, "Override the default max entries db size.")
deployCmd.Flags().String(configStructs.InsertionFilterName, defaultDeployConfig.InsertionFilter, "Set the insertion filter. Accepts string or a file path.")
deployCmd.Flags().Bool(configStructs.DryRunTapName, defaultDeployConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
deployCmd.Flags().Bool(configStructs.ServiceMeshName, defaultDeployConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
deployCmd.Flags().Bool(configStructs.TlsName, defaultDeployConfig.Tls, "Record tls traffic")
deployCmd.Flags().Bool(configStructs.ProfilerName, defaultDeployConfig.Profiler, "Run pprof server")
deployCmd.Flags().Int(configStructs.MaxLiveStreamsName, defaultDeployConfig.MaxLiveStreams, "Maximum live tcp streams to handle concurrently")
deployCmd.Flags().Bool(configStructs.DryRunLabel, defaultDeployConfig.DryRun, "Preview of all pods matching the regex, without deploying workers on them.")
deployCmd.Flags().Bool(configStructs.ServiceMeshName, defaultDeployConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls.")
deployCmd.Flags().Bool(configStructs.TlsName, defaultDeployConfig.Tls, "Record tls traffic.")
deployCmd.Flags().Bool(configStructs.ProfilerName, defaultDeployConfig.Profiler, "Run pprof server.")
deployCmd.Flags().Int(configStructs.MaxLiveStreamsName, defaultDeployConfig.MaxLiveStreams, "Maximum live tcp streams to handle concurrently.")
}

View File

@ -27,19 +27,19 @@ import (
const cleanupTimeout = time.Minute
type tapState struct {
type deployState struct {
startTime time.Time
targetNamespaces []string
kubesharkServiceAccountExists bool
}
var state tapState
var state deployState
var connector *connect.Connector
var hubPodReady bool
var frontPodReady bool
var proxyDone bool
func RunKubesharkTap() {
func deploy() {
state.startTime = time.Now()
connector = connect.NewConnector(kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort), connect.DefaultRetries, connect.DefaultTimeout)
@ -63,14 +63,14 @@ func RunKubesharkTap() {
if config.Config.IsNsRestrictedMode() {
if len(state.targetNamespaces) != 1 || !utils.Contains(state.targetNamespaces, config.Config.ResourcesNamespace) {
log.Error().Msg(fmt.Sprintf("Kubeshark can't resolve IPs in other namespaces when running in namespace restricted mode. You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.ResourcesNamespaceConfigName))
log.Error().Msg(fmt.Sprintf("Kubeshark can't resolve IPs in other namespaces when running in namespace restricted mode. You can use the same namespace for --%s and --%s", configStructs.NamespacesLabel, config.ResourcesNamespaceConfigName))
return
}
}
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:")
if err := printTappedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
}
@ -79,7 +79,7 @@ func RunKubesharkTap() {
}
log.Info().Msg("Waiting for Kubeshark deployment to finish...")
if state.kubesharkServiceAccountExists, err = resources.CreateTapKubesharkResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace, config.Config.Deploy.MaxEntriesDBSizeBytes(), config.Config.Deploy.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Deploy.Profiler); err != nil {
if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace, config.Config.Deploy.MaxEntriesDBSizeBytes(), config.Config.Deploy.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Deploy.Profiler); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Info().Msg("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance")
@ -91,7 +91,7 @@ func RunKubesharkTap() {
return
}
defer finishTapExecution(kubernetesProvider)
defer finishDeployExecution(kubernetesProvider)
go goUtils.HandleExcWrapper(watchHubEvents, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchHubPod, ctx, kubernetesProvider, cancel)
@ -101,7 +101,7 @@ func RunKubesharkTap() {
utils.WaitForFinish(ctx, cancel)
}
func finishTapExecution(kubernetesProvider *kubernetes.Provider) {
func finishDeployExecution(kubernetesProvider *kubernetes.Provider) {
finishKubesharkExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace)
}
@ -110,7 +110,7 @@ func getDeployConfig() *models.Config {
MaxDBSizeBytes: config.Config.Deploy.MaxEntriesDBSizeBytes(),
InsertionFilter: config.Config.Deploy.GetInsertionFilter(),
PullPolicy: config.Config.ImagePullPolicyStr,
TapperResources: config.Config.Deploy.TapperResources,
TapperResources: config.Config.Deploy.WorkerResources,
KubesharkResourcesNamespace: config.Config.ResourcesNamespace,
AgentDatabasePath: models.DataDirPath,
ServiceMap: config.Config.ServiceMap,
@ -121,30 +121,30 @@ func getDeployConfig() *models.Config {
}
/*
this function is a bit problematic as it might be detached from the actual pods the Kubeshark Hub will tap.
This function is a bit problematic as it might be detached from the actual pods the Kubeshark that targets.
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Deploy.PodRegex(), namespaces); err != nil {
return err
} else {
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
for _, tappedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, tappedPod.Name)))
for _, targettedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name)))
}
return nil
}
}
func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
tapperSyncer, err := kubernetes.CreateAndStartKubesharkTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{
TargetNamespaces: targetNamespaces,
PodFilterRegex: *config.Config.Deploy.PodRegex(),
KubesharkResourcesNamespace: config.Config.ResourcesNamespace,
TapperResources: config.Config.Deploy.TapperResources,
WorkerResources: config.Config.Deploy.WorkerResources,
ImagePullPolicy: config.Config.ImagePullPolicy(),
LogLevel: config.Config.LogLevel(),
KubesharkApiFilteringOptions: api.TrafficFilteringOptions{
@ -163,31 +163,31 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
go func() {
for {
select {
case syncerErr, ok := <-tapperSyncer.ErrorOut:
case syncerErr, ok := <-workerSyncer.ErrorOut:
if !ok {
log.Debug().Msg("kubesharkTapperSyncer err channel closed, ending listener loop")
log.Debug().Msg("workerSyncer err channel closed, ending listener loop")
return
}
log.Error().Msg(getErrorDisplayTextForK8sTapManagerError(syncerErr))
log.Error().Msg(getK8sDeployManagerErrorText(syncerErr))
cancel()
case _, ok := <-tapperSyncer.TapPodChangesOut:
case _, ok := <-workerSyncer.DeployPodChangesOut:
if !ok {
log.Debug().Msg("kubesharkTapperSyncer pod changes channel closed, ending listener loop")
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return
}
if err := connector.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil {
log.Error().Err(err).Msg("failed update tapped pods.")
if err := connector.ReportTargettedPods(workerSyncer.CurrentlyTargettedPods); err != nil {
log.Error().Err(err).Msg("failed update targetted pods.")
}
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
case workerStatus, ok := <-workerSyncer.WorkerStatusChangedOut:
if !ok {
log.Debug().Msg("kubesharkTapperSyncer tapper status changed channel closed, ending listener loop")
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
return
}
if err := connector.ReportTapperStatus(tapperStatus); err != nil {
log.Error().Err(err).Msg("failed update tapper status.")
if err := connector.ReportWorkerStatus(workerStatus); err != nil {
log.Error().Err(err).Msg("failed update worker status.")
}
case <-ctx.Done():
log.Debug().Msg("kubesharkTapperSyncer event listener loop exiting due to context done")
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
return
}
}
@ -199,21 +199,21 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
suggestionStr = ". You can also try selecting a different namespace with -n or tap all namespaces with -A"
suggestionStr = ". You can also try selecting a different namespace with -n or target all namespaces with -A"
}
log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, kubeshark will automatically tap matching pods if any are created later%s", suggestionStr))
log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, kubeshark will automatically target matching pods if any are created later%s", suggestionStr))
}
func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
return fmt.Sprintf("Failed to update currently tapped pods: %v", err.OriginalError)
case kubernetes.TapManagerPodWatchError:
return fmt.Sprintf("Error occured in k8s pod watch: %v", err.OriginalError)
case kubernetes.TapManagerTapperUpdateError:
return fmt.Sprintf("Error updating tappers: %v", err.OriginalError)
func getK8sDeployManagerErrorText(err kubernetes.K8sDeployManagerError) string {
switch err.DeployManagerReason {
case kubernetes.DeployManagerPodListError:
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError)
case kubernetes.DeployManagerPodWatchError:
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
case kubernetes.DeployManagerWorkerUpdateError:
return fmt.Sprintf("Error updating worker: %v", err.OriginalError)
default:
return fmt.Sprintf("Unknown error occured in k8s tap manager: %v", err.OriginalError)
return fmt.Sprintf("Unknown error occured in K8s deploy manager: %v", err.OriginalError)
}
}
@ -450,8 +450,8 @@ func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, "/echo")
if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting kubeshark tapper syncer")
if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting kubeshark worker syncer")
cancel()
}

View File

@ -34,7 +34,7 @@ func runOpen() {
if !exists {
log.Error().
Str("service", kubernetes.FrontServiceName).
Str("command", "kubeshark tap").
Str("command", fmt.Sprintf("kubeshark %s", deployCmd.Use)).
Msg("Service not found! You should run the command first:")
cancel()
return

View File

@ -36,7 +36,7 @@ func init() {
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the tapCmd.
// This is called by main.main(). It only needs to happen once to the deployCmd.
func Execute() {
cobra.CheckErr(rootCmd.Execute())
}

View File

@ -13,22 +13,22 @@ import (
)
const (
GuiPortTapName = "gui-port"
NamespacesTapName = "namespaces"
AllNamespacesTapName = "all-namespaces"
EnableRedactionTapName = "redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
InsertionFilterName = "insertion-filter"
DryRunTapName = "dry-run"
ServiceMeshName = "service-mesh"
TlsName = "tls"
ProfilerName = "profiler"
MaxLiveStreamsName = "max-live-streams"
ProxyPortLabel = "proxy-port"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "all-namespaces"
EnableRedactionLabel = "redact"
HumanMaxEntriesDBSizeLabel = "max-entries-db-size"
InsertionFilterName = "insertion-filter"
DryRunLabel = "dry-run"
ServiceMeshName = "service-mesh"
TlsName = "tls"
ProfilerName = "profiler"
MaxLiveStreamsName = "max-live-streams"
)
type DeployConfig struct {
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
ProxyPort uint16 `yaml:"proxy-port" default:"8899"`
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
Namespaces []string `yaml:"namespaces"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
@ -45,7 +45,7 @@ type DeployConfig struct {
InsertionFilter string `yaml:"insertion-filter" default:""`
DryRun bool `yaml:"dry-run" default:"false"`
HubResources models.Resources `yaml:"hub-resources"`
TapperResources models.Resources `yaml:"tapper-resources"`
WorkerResources models.Resources `yaml:"worker-resources"`
ServiceMesh bool `yaml:"service-mesh" default:"false"`
Tls bool `yaml:"tls" default:"false"`
PacketCapture string `yaml:"packet-capture" default:"libpcap"`
@ -126,7 +126,7 @@ func (config *DeployConfig) Validate() error {
_, parseHumanDataSizeErr := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize)
if parseHumanDataSizeErr != nil {
return fmt.Errorf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize)
return fmt.Errorf("Could not parse --%s value %s", HumanMaxEntriesDBSizeLabel, config.HumanMaxEntriesDBSize)
}
return nil

View File

@ -17,7 +17,7 @@ func FormatError(err error) error {
if k8serrors.IsForbidden(err) {
errorNew = fmt.Errorf("insufficient permissions: %w. "+
"supply the required permission or control Kubeshark's access to namespaces by setting %s "+
"in the config file or setting the tapped namespace with --%s %s=<NAMEPSACE>",
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>",
err,
config.ResourcesNamespaceConfigName,
config.SetCommandName,

View File

@ -62,31 +62,31 @@ func (connector *Connector) isReachable(path string) (bool, error) {
}
}
func (connector *Connector) ReportTapperStatus(tapperStatus models.TapperStatus) error {
tapperStatusUrl := fmt.Sprintf("%s/status/tapperStatus", connector.url)
func (connector *Connector) ReportWorkerStatus(workerStatus models.TapperStatus) error {
workerStatusUrl := fmt.Sprintf("%s/status/tapperStatus", connector.url)
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("Failed Marshal the tapper status %w", err)
if jsonValue, err := json.Marshal(workerStatus); err != nil {
return fmt.Errorf("Failed Marshal the worker status %w", err)
} else {
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
return fmt.Errorf("Failed sending to Hub the tapped pods %w", err)
if _, err := utils.Post(workerStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
return fmt.Errorf("Failed sending to Hub the targetted pods %w", err)
} else {
log.Debug().Interface("tapper-status", tapperStatus).Msg("Reported to Hub about tapper status:")
log.Debug().Interface("worker-status", workerStatus).Msg("Reported to Hub about Worker status:")
return nil
}
}
}
func (connector *Connector) ReportTappedPods(pods []core.Pod) error {
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", connector.url)
func (connector *Connector) ReportTargettedPods(pods []core.Pod) error {
targettedPodsUrl := fmt.Sprintf("%s/status/tappedPods", connector.url)
if jsonValue, err := json.Marshal(pods); err != nil {
return fmt.Errorf("Failed Marshal the tapped pods %w", err)
return fmt.Errorf("Failed Marshal the targetted pods %w", err)
} else {
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
return fmt.Errorf("Failed sending to Hub the tapped pods %w", err)
if _, err := utils.Post(targettedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
return fmt.Errorf("Failed sending to Hub the targetted pods %w", err)
} else {
log.Debug().Int("pod-count", len(pods)).Msg("Reported to Hub about tapped pod count:")
log.Debug().Int("pod-count", len(pods)).Msg("Reported to Hub about targetted pod count:")
return nil
}
}

View File

@ -12,8 +12,8 @@ const (
RoleBindingName = KubesharkResourcesPrefix + "role-binding"
RoleName = KubesharkResourcesPrefix + "role"
ServiceAccountName = KubesharkResourcesPrefix + "service-account"
TapperDaemonSetName = KubesharkResourcesPrefix + "worker-daemon-set"
TapperPodName = KubesharkResourcesPrefix + "worker"
WorkerDaemonSetName = KubesharkResourcesPrefix + "worker-daemon-set"
WorkerPodName = KubesharkResourcesPrefix + "worker"
ConfigMapName = KubesharkResourcesPrefix + "config"
MinKubernetesServerVersion = "1.16.0"
)

View File

@ -1,20 +1,20 @@
package kubernetes
type K8sTapManagerErrorReason string
type K8sDeployManagerErrorReason string
const (
TapManagerTapperUpdateError K8sTapManagerErrorReason = "TAPPER_UPDATE_ERROR"
TapManagerPodWatchError K8sTapManagerErrorReason = "POD_WATCH_ERROR"
TapManagerPodListError K8sTapManagerErrorReason = "POD_LIST_ERROR"
DeployManagerWorkerUpdateError K8sDeployManagerErrorReason = "TAPPER_UPDATE_ERROR"
DeployManagerPodWatchError K8sDeployManagerErrorReason = "POD_WATCH_ERROR"
DeployManagerPodListError K8sDeployManagerErrorReason = "POD_LIST_ERROR"
)
type K8sTapManagerError struct {
OriginalError error
TapManagerReason K8sTapManagerErrorReason
type K8sDeployManagerError struct {
OriginalError error
DeployManagerReason K8sDeployManagerErrorReason
}
// K8sTapManagerError implements the Error interface.
func (e *K8sTapManagerError) Error() string {
// K8sDeployManagerError implements the Error interface.
func (e *K8sDeployManagerError) Error() string {
return e.OriginalError.Error()
}

View File

@ -15,32 +15,32 @@ import (
core "k8s.io/api/core/v1"
)
const updateTappersDelay = 5 * time.Second
const updateWorkersDelay = 5 * time.Second
type TappedPodChangeEvent struct {
type TargettedPodChangeEvent struct {
Added []core.Pod
Removed []core.Pod
}
// KubesharkTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
type KubesharkTapperSyncer struct {
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct {
startTime time.Time
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
CurrentlyTargettedPods []core.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
TapperStatusChangedOut chan models.TapperStatus
ErrorOut chan K8sTapManagerError
nodeToTappedPodMap models.NodeToPodsMap
tappedNodes []string
DeployPodChangesOut chan TargettedPodChangeEvent
WorkerStatusChangedOut chan models.TapperStatus
ErrorOut chan K8sDeployManagerError
nodeToTargettedPodMap models.NodeToPodsMap
targettedNodes []string
}
type TapperSyncerConfig struct {
type WorkerSyncerConfig struct {
TargetNamespaces []string
PodFilterRegex regexp.Regexp
KubesharkResourcesNamespace string
TapperResources models.Resources
WorkerResources models.Resources
ImagePullPolicy core.PullPolicy
LogLevel zerolog.Level
KubesharkApiFilteringOptions api.TrafficFilteringOptions
@ -50,36 +50,36 @@ type TapperSyncerConfig struct {
MaxLiveStreams int
}
func CreateAndStartKubesharkTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*KubesharkTapperSyncer, error) {
syncer := &KubesharkTapperSyncer{
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTappedPods: make([]core.Pod, 0),
CurrentlyTargettedPods: make([]core.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TappedPodChangeEvent, 100),
TapperStatusChangedOut: make(chan models.TapperStatus, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
DeployPodChangesOut: make(chan TargettedPodChangeEvent, 100),
WorkerStatusChangedOut: make(chan models.TapperStatus, 100),
ErrorOut: make(chan K8sDeployManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTappedPods(); err != nil {
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil {
return nil, err
}
if err := syncer.updateKubesharkTappers(); err != nil {
if err := syncer.updateWorkers(); err != nil {
return nil, err
}
go syncer.watchPodsForTapping()
go syncer.watchTapperEvents()
go syncer.watchTapperPods()
go syncer.watchPodsForTargetting()
go syncer.watchWorkerEvents()
go syncer.watchWorkerPods()
return syncer, nil
}
func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() {
kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, kubesharkResourceRegex)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, []string{tapperSyncer.config.KubesharkResourcesNamespace}, podWatchHelper)
func (workerSyncer *WorkerSyncer) watchWorkerPods() {
kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, kubesharkResourceRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.KubesharkResourcesNamespace}, podWatchHelper)
for {
select {
@ -91,7 +91,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() {
pod, err := wEvent.ToPod()
if err != nil {
log.Error().Str("pod", TapperPodName).Err(err).Msg("While parsing Kubeshark resource!")
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While parsing Kubeshark resource!")
continue
}
@ -101,8 +101,8 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() {
Interface("phase", pod.Status.Phase).
Msg("Watching pod events...")
if pod.Spec.NodeName != "" {
tapperStatus := models.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
tapperSyncer.TapperStatusChangedOut <- tapperStatus
workerStatus := models.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
workerSyncer.WorkerStatusChangedOut <- workerStatus
}
case err, ok := <-errorChan:
@ -110,21 +110,21 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() {
errorChan = nil
continue
}
log.Error().Str("pod", TapperPodName).Err(err).Msg("While watching pod!")
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!")
case <-tapperSyncer.context.Done():
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", TapperPodName).
Str("pod", WorkerPodName).
Msg("Watching pod, context done.")
return
}
}
}
func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, kubesharkResourceRegex, "pod")
eventChan, errorChan := FilteredWatch(tapperSyncer.context, eventWatchHelper, []string{tapperSyncer.config.KubesharkResourcesNamespace}, eventWatchHelper)
func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, kubesharkResourceRegex, "pod")
eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.KubesharkResourcesNamespace}, eventWatchHelper)
for {
select {
@ -137,14 +137,14 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
event, err := wEvent.ToEvent()
if err != nil {
log.Error().
Str("pod", TapperPodName).
Str("pod", WorkerPodName).
Err(err).
Msg("Parsing resource event.")
continue
}
log.Debug().
Str("pod", TapperPodName).
Str("pod", WorkerPodName).
Str("event", event.Name).
Time("time", event.CreationTimestamp.Time).
Str("name", event.Regarding.Name).
@ -153,7 +153,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
Str("note", event.Note).
Msg("Watching events.")
pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.KubesharkResourcesNamespace, event.Regarding.Name)
pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.KubesharkResourcesNamespace, event.Regarding.Name)
if err1 != nil {
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
continue
@ -166,8 +166,8 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0]
}
tapperStatus := models.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)}
tapperSyncer.TapperStatusChangedOut <- tapperStatus
workerStatus := models.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)}
workerSyncer.WorkerStatusChangedOut <- workerStatus
case err, ok := <-errorChan:
if !ok {
@ -176,44 +176,44 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() {
}
log.Error().
Str("pod", TapperPodName).
Str("pod", WorkerPodName).
Err(err).
Msg("While watching events.")
case <-tapperSyncer.context.Done():
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", TapperPodName).
Str("pod", WorkerPodName).
Msg("Watching pod events, context done.")
return
}
}
}
func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
err, changeFound := workerSyncer.updateCurrentlyTargettedPods()
if err != nil {
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodListError,
workerSyncer.ErrorOut <- K8sDeployManagerError{
OriginalError: err,
DeployManagerReason: DeployManagerPodListError,
}
}
if !changeFound {
log.Debug().Msg("Nothing changed. Updating tappers is not needed.")
log.Debug().Msg("Nothing changed. Updating workers is not needed.")
return
}
if err := tapperSyncer.updateKubesharkTappers(); err != nil {
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerTapperUpdateError,
if err := workerSyncer.updateWorkers(); err != nil {
workerSyncer.ErrorOut <- K8sDeployManagerError{
OriginalError: err,
DeployManagerReason: DeployManagerWorkerUpdateError,
}
}
}
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, handleChangeInPods)
restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods)
for {
select {
@ -225,7 +225,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
}
@ -235,24 +235,24 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Added matching pod.")
if err := restartTappersDebouncer.SetOn(); err != nil {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting tappers!")
Msg("While restarting workers!")
}
case EventDeleted:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Removed matching pod.")
if err := restartTappersDebouncer.SetOn(); err != nil {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting tappers!")
Msg("While restarting workers!")
}
case EventModified:
log.Debug().
@ -269,12 +269,12 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
if err := restartTappersDebouncer.SetOn(); err != nil {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting tappers!")
Msg("While restarting workers!")
}
}
case EventBookmark:
@ -288,33 +288,33 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() {
continue
}
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
case <-tapperSyncer.context.Done():
log.Debug().Msg("Watching pods, context done. Stopping \"restart tappers debouncer\"")
restartTappersDebouncer.Cancel()
case <-workerSyncer.context.Done():
log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
}
}
}
func (tapperSyncer *KubesharkTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart tappers debouncer\"")
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) {
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
workerSyncer.ErrorOut <- K8sDeployManagerError{
OriginalError: err,
DeployManagerReason: DeployManagerPodWatchError,
}
}
func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTap := excludeKubesharkPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(tapperSyncer.CurrentlyTappedPods, podsToTap)
podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget)
for _, addedPod := range addedPods {
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:")
}
@ -322,9 +322,9 @@ func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err erro
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.")
}
if len(addedPods) > 0 || len(removedPods) > 0 {
tapperSyncer.CurrentlyTappedPods = podsToTap
tapperSyncer.nodeToTappedPodMap = GetNodeHostToTappedPodsMap(tapperSyncer.CurrentlyTappedPods)
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
workerSyncer.CurrentlyTargettedPods = podsToTarget
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods)
workerSyncer.DeployPodChangesOut <- TargettedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
@ -334,70 +334,70 @@ func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err erro
}
}
func (tapperSyncer *KubesharkTapperSyncer) updateKubesharkTappers() error {
nodesToTap := make([]string, len(tapperSyncer.nodeToTappedPodMap))
func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap))
i := 0
for node := range tapperSyncer.nodeToTappedPodMap {
nodesToTap[i] = node
for node := range workerSyncer.nodeToTargettedPodMap {
nodesToTarget[i] = node
i++
}
if utils.EqualStringSlices(nodesToTap, tapperSyncer.tappedNodes) {
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
log.Debug().Strs("nodes", nodesToTap).Msg("Updating DaemonSet to run on nodes.")
log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.")
image := "kubeshark/worker:latest"
if len(tapperSyncer.nodeToTappedPodMap) > 0 {
if len(workerSyncer.nodeToTargettedPodMap) > 0 {
var serviceAccountName string
if tapperSyncer.config.KubesharkServiceAccountExists {
if workerSyncer.config.KubesharkServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
nodeNames := make([]string, 0, len(tapperSyncer.nodeToTappedPodMap))
for nodeName := range tapperSyncer.nodeToTappedPodMap {
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap))
for nodeName := range workerSyncer.nodeToTargettedPodMap {
nodeNames = append(nodeNames, nodeName)
}
if err := tapperSyncer.kubernetesProvider.ApplyKubesharkTapperDaemonSet(
tapperSyncer.context,
tapperSyncer.config.KubesharkResourcesNamespace,
TapperDaemonSetName,
if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.KubesharkResourcesNamespace,
WorkerDaemonSetName,
image,
TapperPodName,
fmt.Sprintf("%s.%s.svc", HubPodName, tapperSyncer.config.KubesharkResourcesNamespace),
WorkerPodName,
fmt.Sprintf("%s.%s.svc", HubPodName, workerSyncer.config.KubesharkResourcesNamespace),
nodeNames,
serviceAccountName,
tapperSyncer.config.TapperResources,
tapperSyncer.config.ImagePullPolicy,
tapperSyncer.config.KubesharkApiFilteringOptions,
tapperSyncer.config.LogLevel,
tapperSyncer.config.ServiceMesh,
tapperSyncer.config.Tls,
tapperSyncer.config.MaxLiveStreams); err != nil {
workerSyncer.config.WorkerResources,
workerSyncer.config.ImagePullPolicy,
workerSyncer.config.KubesharkApiFilteringOptions,
workerSyncer.config.LogLevel,
workerSyncer.config.ServiceMesh,
workerSyncer.config.Tls,
workerSyncer.config.MaxLiveStreams); err != nil {
return err
}
log.Debug().Int("tapper-count", len(tapperSyncer.nodeToTappedPodMap)).Msg("Successfully created tappers.")
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.")
} else {
if err := tapperSyncer.kubernetesProvider.ResetKubesharkTapperDaemonSet(
tapperSyncer.context,
tapperSyncer.config.KubesharkResourcesNamespace,
TapperDaemonSetName,
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.KubesharkResourcesNamespace,
WorkerDaemonSetName,
image,
TapperPodName); err != nil {
WorkerPodName); err != nil {
return err
}
log.Debug().Msg("Successfully reset tapper daemon set")
log.Debug().Msg("Successfully resetted Worker DaemonSet")
}
tapperSyncer.tappedNodes = nodesToTap
workerSyncer.targettedNodes = nodesToTarget
return nil
}

View File

@ -810,17 +810,17 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
return nil
}
func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel zerolog.Level, serviceMesh bool, tls bool, maxLiveStreams int) error {
func (provider *Provider) ApplyWorkerDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, workerPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel zerolog.Level, serviceMesh bool, tls bool, maxLiveStreams int) error {
log.Debug().
Int("node-count", len(nodeNames)).
Str("namespace", namespace).
Str("daemonset-name", daemonSetName).
Str("image", podImage).
Str("pod", tapperPodName).
Msg("Applying tapper DaemonSets.")
Str("pod", workerPodName).
Msg("Applying worker DaemonSets.")
if len(nodeNames) == 0 {
return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName)
return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName)
}
kubesharkApiFilteringOptionsJsonStr, err := json.Marshal(kubesharkApiFilteringOptions)
@ -849,7 +849,7 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam
}
workerContainer := applyconfcore.Container()
workerContainer.WithName(tapperPodName)
workerContainer.WithName(workerPodName)
workerContainer.WithImage(podImage)
workerContainer.WithImagePullPolicy(imagePullPolicy)
@ -887,19 +887,19 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam
)
cpuLimit, err := resource.ParseQuantity(resources.CpuLimit)
if err != nil {
return fmt.Errorf("invalid cpu limit for %s container", tapperPodName)
return fmt.Errorf("invalid cpu limit for %s container", workerPodName)
}
memLimit, err := resource.ParseQuantity(resources.MemoryLimit)
if err != nil {
return fmt.Errorf("invalid memory limit for %s container", tapperPodName)
return fmt.Errorf("invalid memory limit for %s container", workerPodName)
}
cpuRequests, err := resource.ParseQuantity(resources.CpuRequests)
if err != nil {
return fmt.Errorf("invalid cpu request for %s container", tapperPodName)
return fmt.Errorf("invalid cpu request for %s container", workerPodName)
}
memRequests, err := resource.ParseQuantity(resources.MemoryRequests)
if err != nil {
return fmt.Errorf("invalid memory request for %s container", tapperPodName)
return fmt.Errorf("invalid memory request for %s container", workerPodName)
}
workerResourceLimits := core.ResourceList{
"cpu": cpuLimit,
@ -967,14 +967,14 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": tapperPodName,
"app": workerPodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
})
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
labelSelector.WithMatchLabels(map[string]string{"app": workerPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
@ -993,9 +993,9 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam
return err
}
func (provider *Provider) ResetKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string) error {
func (provider *Provider) ResetWorkerDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, workerPodName string) error {
workerContainer := applyconfcore.Container()
workerContainer.WithName(tapperPodName)
workerContainer.WithName(workerPodName)
workerContainer.WithImage(podImage)
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
@ -1016,14 +1016,14 @@ func (provider *Provider) ResetKubesharkTapperDaemonSet(ctx context.Context, nam
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": tapperPodName,
"app": workerPodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
})
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
labelSelector.WithMatchLabels(map[string]string{"app": workerPodName})
applyOptions := metav1.ApplyOptions{
Force: true,

View File

@ -8,19 +8,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) models.NodeToPodsMap {
nodeToTappedPodMap := make(models.NodeToPodsMap)
for _, pod := range tappedPods {
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap {
nodeToTargettedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targettedPods {
minimizedPod := getMinimizedPod(pod)
existingList := nodeToTappedPodMap[pod.Spec.NodeName]
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTappedPodMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
} else {
nodeToTappedPodMap[pod.Spec.NodeName] = append(nodeToTappedPodMap[pod.Spec.NodeName], minimizedPod)
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod)
}
}
return nodeToTappedPodMap
return nodeToTargettedPodsMap
}
func getMinimizedPod(fullPod core.Pod) core.Pod {
@ -48,7 +48,7 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus {
return result
}
func excludeKubesharkPods(pods []core.Pod) []core.Pod {
func excludeSelfPods(pods []core.Pod) []core.Pod {
kubesharkPrefixRegex := regexp.MustCompile("^" + KubesharkResourcesPrefix)
nonKubesharkPods := make([]core.Pod, 0)

View File

@ -107,8 +107,8 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveDaemonSet(ctx, kubesharkResourcesNamespace, kubernetes.TapperDaemonSetName); err != nil {
resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", kubernetes.TapperDaemonSetName, kubesharkResourcesNamespace)
if err := kubernetesProvider.RemoveDaemonSet(ctx, kubesharkResourcesNamespace, kubernetes.WorkerDaemonSetName); err != nil {
resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", kubernetes.WorkerDaemonSetName, kubesharkResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}

View File

@ -13,7 +13,7 @@ import (
core "k8s.io/api/core/v1"
)
func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, logLevel zerolog.Level, profiler bool) (bool, error) {
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, logLevel zerolog.Level, profiler bool) (bool, error) {
if !isNsRestrictedMode {
if err := createKubesharkNamespace(ctx, kubernetesProvider, kubesharkResourcesNamespace); err != nil {
return false, err